Skip to content

Commit

Permalink
[eclipse-iceoryx#507] Introduce CallbackProgression in all wait_and_p…
Browse files Browse the repository at this point in the history
…rocess calls; rename try_wait_and_process into wait_and_process_once
  • Loading branch information
elfenpiff committed Nov 13, 2024
1 parent 6d77116 commit 9b38d96
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 71 deletions.
7 changes: 2 additions & 5 deletions iceoryx2-bb/posix/src/deadline_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,18 +268,15 @@ impl DeadlineQueue {

/// Iterates over all missed deadlines and calls the provided callback for each of them
/// and provide the [`DeadlineQueueIndex`] to identify them.
pub fn missed_deadlines<F: FnMut(DeadlineQueueIndex)>(
pub fn missed_deadlines<F: FnMut(DeadlineQueueIndex) -> CallbackProgression>(
&self,
mut call: F,
) -> Result<(), TimeError> {
let now = fail!(from self, when Time::now_with_clock(self.clock_type),
"Unable to return next duration since the current time could not be acquired.");

let now = now.as_duration().as_nanos();
self.handle_missed_deadlines(now, |idx| {
call(idx);
CallbackProgression::Continue
});
self.handle_missed_deadlines(now, |idx| -> CallbackProgression { call(idx) });

Ok(())
}
Expand Down
45 changes: 38 additions & 7 deletions iceoryx2-bb/posix/tests/deadline_queue_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// SPDX-License-Identifier: Apache-2.0 OR MIT

mod deadline_queue {
use iceoryx2_bb_elementary::CallbackProgression;
use iceoryx2_bb_posix::deadline_queue::*;
use iceoryx2_bb_testing::assert_that;
use std::time::Duration;
Expand Down Expand Up @@ -85,8 +86,11 @@ mod deadline_queue {
.unwrap();

let mut missed_deadline_queues = vec![];
sut.missed_deadlines(|idx| missed_deadline_queues.push(idx))
.unwrap();
sut.missed_deadlines(|idx| {
missed_deadline_queues.push(idx);
CallbackProgression::Continue
})
.unwrap();

assert_that!(missed_deadline_queues, len 0);
}
Expand All @@ -104,8 +108,11 @@ mod deadline_queue {
std::thread::sleep(Duration::from_millis(10));

let mut missed_deadlines = vec![];
sut.missed_deadlines(|idx| missed_deadlines.push(idx))
.unwrap();
sut.missed_deadlines(|idx| {
missed_deadlines.push(idx);
CallbackProgression::Continue
})
.unwrap();

assert_that!(missed_deadlines, len 1);
assert_that!(missed_deadlines, contains _guard_1.index());
Expand All @@ -122,15 +129,38 @@ mod deadline_queue {
std::thread::sleep(Duration::from_millis(10));

let mut missed_deadlines = vec![];
sut.missed_deadlines(|idx| missed_deadlines.push(idx))
.unwrap();
sut.missed_deadlines(|idx| {
missed_deadlines.push(idx);
CallbackProgression::Continue
})
.unwrap();

assert_that!(missed_deadlines, len 3);
assert_that!(missed_deadlines, contains guard_1.index());
assert_that!(missed_deadlines, contains guard_2.index());
assert_that!(missed_deadlines, contains guard_3.index());
}

#[test]
fn missed_deadline_iteration_stops_when_requested() {
let sut = DeadlineQueueBuilder::new().create().unwrap();

let _guard_1 = sut.add_deadline_interval(Duration::from_nanos(1)).unwrap();
let _guard_2 = sut.add_deadline_interval(Duration::from_nanos(10)).unwrap();
let _guard_3 = sut.add_deadline_interval(Duration::from_nanos(20)).unwrap();

std::thread::sleep(Duration::from_millis(10));

let mut missed_deadlines = vec![];
sut.missed_deadlines(|idx| {
missed_deadlines.push(idx);
CallbackProgression::Stop
})
.unwrap();

assert_that!(missed_deadlines, len 1);
}

#[test]
fn duration_until_next_deadline_is_zero_if_deadline_is_already_missed() {
let sut = DeadlineQueueBuilder::new().create().unwrap();
Expand All @@ -149,7 +179,8 @@ mod deadline_queue {
let mut deadline_idx = None;
sut.missed_deadlines(|idx| {
missed_deadline_counter += 1;
deadline_idx = Some(idx)
deadline_idx = Some(idx);
CallbackProgression::Continue
})
.unwrap();

Expand Down
92 changes: 41 additions & 51 deletions iceoryx2/src/port/waitset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ use std::{
sync::atomic::Ordering, time::Duration,
};

use iceoryx2_bb_elementary::CallbackProgression;
use iceoryx2_bb_log::fail;
use iceoryx2_bb_posix::{
deadline_queue::{DeadlineQueue, DeadlineQueueBuilder, DeadlineQueueGuard, DeadlineQueueIndex},
Expand All @@ -195,7 +196,7 @@ use iceoryx2_bb_posix::{
signal::SignalHandler,
};
use iceoryx2_cal::reactor::*;
use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicBool, IoxAtomicUsize};
use iceoryx2_pal_concurrency_sync::iox_atomic::IoxAtomicUsize;

/// States why the [`WaitSet::wait_and_process()`] method returned.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
Expand All @@ -206,6 +207,8 @@ pub enum WaitSetRunResult {
Interrupt,
/// The user explicitly called [`WaitSet::stop()`].
StopRequest,
/// All events were handled.
AllEventsHandled,
}

/// Defines the failures that can occur when attaching something with
Expand Down Expand Up @@ -237,10 +240,6 @@ pub enum WaitSetRunError {
InternalError,
/// Waiting on an empty [`WaitSet`] would lead to a deadlock therefore it causes an error.
NoAttachments,
/// A termination signal `SIGTERM` was received.
TerminationRequest,
/// An interrupt signal `SIGINT` was received.
Interrupt,
}

impl std::fmt::Display for WaitSetRunError {
Expand Down Expand Up @@ -467,7 +466,6 @@ impl WaitSetBuilder {
attachment_to_deadline: RefCell::new(HashMap::new()),
deadline_to_attachment: RefCell::new(HashMap::new()),
attachment_counter: IoxAtomicUsize::new(0),
keep_running: IoxAtomicBool::new(true),
}),
Err(ReactorCreateError::UnknownError(e)) => {
fail!(from self, with WaitSetCreateError::InternalError,
Expand All @@ -494,7 +492,6 @@ pub struct WaitSet<Service: crate::service::Service> {
attachment_to_deadline: RefCell<HashMap<i32, DeadlineQueueIndex>>,
deadline_to_attachment: RefCell<HashMap<DeadlineQueueIndex, i32>>,
attachment_counter: IoxAtomicUsize,
keep_running: IoxAtomicBool,
}

impl<Service: crate::service::Service> WaitSet<Service> {
Expand Down Expand Up @@ -537,34 +534,41 @@ impl<Service: crate::service::Service> WaitSet<Service> {
}
}

fn handle_deadlines<F: FnMut(WaitSetAttachmentId<Service>)>(
fn handle_deadlines<F: FnMut(WaitSetAttachmentId<Service>) -> CallbackProgression>(
&self,
fn_call: &mut F,
error_msg: &str,
) -> Result<(), WaitSetRunError> {
) -> Result<WaitSetRunResult, WaitSetRunError> {
let deadline_to_attachment = self.deadline_to_attachment.borrow();
let call = |idx: DeadlineQueueIndex| {
if let Some(reactor_idx) = deadline_to_attachment.get(&idx) {
fn_call(WaitSetAttachmentId::deadline(self, *reactor_idx, idx));
let mut result = WaitSetRunResult::AllEventsHandled;
let call = |idx: DeadlineQueueIndex| -> CallbackProgression {
let progression = if let Some(reactor_idx) = deadline_to_attachment.get(&idx) {
fn_call(WaitSetAttachmentId::deadline(self, *reactor_idx, idx))
} else {
fn_call(WaitSetAttachmentId::tick(self, idx));
fn_call(WaitSetAttachmentId::tick(self, idx))
};

if let CallbackProgression::Stop = progression {
result = WaitSetRunResult::StopRequest;
}

progression
};

fail!(from self,
when self.deadline_queue.missed_deadlines(call),
with WaitSetRunError::InternalError,
"{error_msg} since the missed deadlines could not be acquired.");

Ok(())
Ok(result)
}

fn handle_all_attachments<F: FnMut(WaitSetAttachmentId<Service>)>(
fn handle_all_attachments<F: FnMut(WaitSetAttachmentId<Service>) -> CallbackProgression>(
&self,
triggered_file_descriptors: &Vec<i32>,
fn_call: &mut F,
error_msg: &str,
) -> Result<(), WaitSetRunError> {
) -> Result<WaitSetRunResult, WaitSetRunError> {
// we need to reset the deadlines first, otherwise a long fn_call may extend the
// deadline unintentionally
let mut fd_and_deadline_queue_idx = Vec::with_capacity(triggered_file_descriptors.len());
Expand All @@ -575,13 +579,20 @@ impl<Service: crate::service::Service> WaitSet<Service> {

// must be called after the deadlines have been reset, in the case that the
// event has been received shortly before the deadline ended.
self.handle_deadlines(fn_call, error_msg)?;

match self.handle_deadlines(fn_call, error_msg)? {
WaitSetRunResult::AllEventsHandled => (),
v => return Ok(v),
};

for fd in triggered_file_descriptors {
fn_call(WaitSetAttachmentId::notification(self, *fd));
if let CallbackProgression::Stop = fn_call(WaitSetAttachmentId::notification(self, *fd))
{
return Ok(WaitSetRunResult::StopRequest);
}
}

Ok(())
Ok(WaitSetRunResult::AllEventsHandled)
}

/// Attaches an object as notification to the [`WaitSet`]. Whenever an event is received on the
Expand Down Expand Up @@ -646,51 +657,39 @@ impl<Service: crate::service::Service> WaitSet<Service> {
})
}

/// Can be called from within a callback during [`WaitSet::wait_and_process()`] to signal the [`WaitSet`]
/// to stop running after this iteration.
pub fn stop(&self) {
self.keep_running.store(false, Ordering::Relaxed);
}

/// Waits in an infinite loop on the [`WaitSet`]. The provided callback is called for every
/// attachment that was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to
/// acquire the source.
/// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit
/// the loop and inform the user via [`WaitSetRunResult`].
pub fn wait_and_process<F: FnMut(WaitSetAttachmentId<Service>)>(
pub fn wait_and_process<F: FnMut(WaitSetAttachmentId<Service>) -> CallbackProgression>(
&self,
mut fn_call: F,
) -> Result<WaitSetRunResult, WaitSetRunError> {
while self.keep_running.load(Ordering::Relaxed) {
match self.try_wait_and_process(&mut fn_call) {
Ok(()) => (),
Err(WaitSetRunError::TerminationRequest) => {
return Ok(WaitSetRunResult::TerminationRequest)
}
Err(WaitSetRunError::Interrupt) => return Ok(WaitSetRunResult::Interrupt),
loop {
match self.wait_and_process_once(&mut fn_call) {
Ok(WaitSetRunResult::AllEventsHandled) => (),
Ok(v) => return Ok(v),
Err(e) => {
fail!(from self, with e,
"Unable to run in WaitSet::wait_and_process() loop since ({:?}) has occurred.", e);
}
}
}

Ok(WaitSetRunResult::StopRequest)
}

/// Tries to wait on the [`WaitSet`]. The provided callback is called for every attachment that
/// was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to acquire the
/// source.
/// If nothing was triggered the [`WaitSet`] returns immediately.
pub fn try_wait_and_process<F: FnMut(WaitSetAttachmentId<Service>)>(
pub fn wait_and_process_once<F: FnMut(WaitSetAttachmentId<Service>) -> CallbackProgression>(
&self,
mut fn_call: F,
) -> Result<(), WaitSetRunError> {
) -> Result<WaitSetRunResult, WaitSetRunError> {
let msg = "Unable to call WaitSet::try_wait_and_process()";

if SignalHandler::termination_requested() {
fail!(from self, with WaitSetRunError::TerminationRequest,
"{msg} since a termination request was received.");
return Ok(WaitSetRunResult::TerminationRequest);
}

if self.is_empty() {
Expand Down Expand Up @@ -719,18 +718,9 @@ impl<Service: crate::service::Service> WaitSet<Service> {
};

match reactor_wait_result {
Ok(0) => {
self.handle_deadlines(&mut fn_call, msg)?;
Ok(())
}
Ok(_) => {
self.handle_all_attachments(&triggered_file_descriptors, &mut fn_call, msg)?;
Ok(())
}
Err(ReactorWaitError::Interrupt) => {
fail!(from self, with WaitSetRunError::Interrupt,
"{msg} since an interrupt signal was received.");
}
Ok(0) => self.handle_deadlines(&mut fn_call, msg),
Ok(_) => self.handle_all_attachments(&triggered_file_descriptors, &mut fn_call, msg),
Err(ReactorWaitError::Interrupt) => return Ok(WaitSetRunResult::Interrupt),
Err(ReactorWaitError::InsufficientPermissions) => {
fail!(from self, with WaitSetRunError::InsufficientPermissions,
"{msg} due to insufficient permissions.");
Expand Down
Loading

0 comments on commit 9b38d96

Please sign in to comment.