Skip to content

Commit

Permalink
Merge pull request #598 from elfenpiff/iox2-508-event-concept-based-o…
Browse files Browse the repository at this point in the history
…n-socketpair

[#508] event concept based on socketpair
  • Loading branch information
elfenpiff authored Jan 21, 2025
2 parents dd42fc9 + 556c618 commit 3c86695
Show file tree
Hide file tree
Showing 10 changed files with 707 additions and 29 deletions.
2 changes: 2 additions & 0 deletions doc/release-notes/iceoryx2-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

* Introduce `socket_pair` abstraction in POSIX wrapper
[#508](https://github.com/eclipse-iceoryx/iceoryx2/issues/508)
* Introduce `socket_pair` event concept
[#508](https://github.com/eclipse-iceoryx/iceoryx2/issues/508)
* Deadline property for event services
[#573](https://github.com/eclipse-iceoryx/iceoryx2/issues/573)
* Use 'std_instead_of_core' clippy warning
Expand Down
77 changes: 63 additions & 14 deletions iceoryx2-bb/posix/src/socket_pair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,25 @@ use crate::{

const BLOCKING_TIMEOUT: Duration = Duration::from_secs(i16::MAX as _);

/// Defines the errors that can occur when a socket pair is created with
/// [`StreamingSocket::create_pair()`].
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum StreamingSocketDuplicateError {
PerProcessFileHandleLimitReached,
Interrupt,
FileDescriptorBroken,
UnknownError(i32),
}

impl From<FcntlError> for StreamingSocketDuplicateError {
fn from(value: FcntlError) -> Self {
match value {
FcntlError::Interrupt => StreamingSocketDuplicateError::Interrupt,
FcntlError::UnknownError(v) => StreamingSocketDuplicateError::UnknownError(v),
}
}
}

/// Defines the errors that can occur when a socket pair is created with
/// [`StreamingSocket::create_pair()`].
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
Expand Down Expand Up @@ -87,6 +106,7 @@ pub enum StreamingSocketPairSendError {
InsufficientResources,
Interrupt,
ConnectionReset,
Disconnected,
UnknownError(i32),
}

Expand Down Expand Up @@ -166,11 +186,26 @@ impl SynchronousMultiplexing for StreamingSocket {}
unsafe impl Send for StreamingSocket {}

impl StreamingSocket {
fn create_type_safe_fd(
raw_fd: i32,
origin: &str,
msg: &str,
) -> Result<FileDescriptor, StreamingSocketPairCreationError> {
match FileDescriptor::new(raw_fd) {
Some(fd) => Ok(fd),
None => {
fail!(from origin,
with StreamingSocketPairCreationError::FileDescriptorBroken,
"This should never happen! {msg} since the socketpair implementation returned a broken file descriptor.");
}
}
}

/// Creates a new [`StreamingSocket`] pair.
pub fn create_pair(
) -> Result<(StreamingSocket, StreamingSocket), StreamingSocketPairCreationError> {
let msg = "Unable to create streaming socket pair";
let origin = "StreamingSocketPairBuilder::create()";
let origin = "StreamingSocket::create_pair()";
let mut fd_values = [0, 0];

if unsafe {
Expand All @@ -182,19 +217,8 @@ impl StreamingSocket {
)
} == 0
{
let create_fd = |fd| -> Result<FileDescriptor, StreamingSocketPairCreationError> {
match FileDescriptor::new(fd) {
Some(fd) => Ok(fd),
None => {
fail!(from origin,
with StreamingSocketPairCreationError::FileDescriptorBroken,
"This should never happen! {msg} since the socketpair implementation returned a broken file descriptor.");
}
}
};

let fd_1 = create_fd(fd_values[0])?;
let fd_2 = create_fd(fd_values[1])?;
let fd_1 = Self::create_type_safe_fd(fd_values[0], origin, msg)?;
let fd_2 = Self::create_type_safe_fd(fd_values[1], origin, msg)?;
let socket_1 = StreamingSocket {
file_descriptor: fd_1,
is_non_blocking: IoxAtomicBool::new(false),
Expand All @@ -220,6 +244,30 @@ impl StreamingSocket {
)
}

/// Duplicates a [`StreamingSocket`]. It is connected to all existing sockets.
pub fn duplicate(&self) -> Result<StreamingSocket, StreamingSocketDuplicateError> {
let origin = "StreamingSocket::duplicate()";
let msg = "Unable to duplicate StreamingSocket";
let duplicated_fd = unsafe { posix::dup(self.file_descriptor.native_handle()) };
if duplicated_fd != -1 {
let new_socket = StreamingSocket {
file_descriptor: Self::create_type_safe_fd(duplicated_fd, origin, msg)
.map_err(|_| StreamingSocketDuplicateError::FileDescriptorBroken)?,
is_non_blocking: IoxAtomicBool::new(false),
};

fail!(from origin, when new_socket.set_non_blocking(true),
"{msg} since the duplicated streaming socket could not be set to non-blocking.");

return Ok(new_socket);
}

handle_errno!(StreamingSocketDuplicateError, from origin,
Errno::EMFILE => (PerProcessFileHandleLimitReached, "{msg} since the processes file descriptor limit was reached."),
v => (UnknownError(v as i32), "{msg} since an unknown error occurred ({v}).")
)
}

fn fcntl(&self, command: i32, value: i32, msg: &str) -> Result<i32, FcntlError> {
let result =
unsafe { posix::fcntl_int(self.file_descriptor.native_handle(), command, value) };
Expand Down Expand Up @@ -320,6 +368,7 @@ impl StreamingSocket {
fatal Errno::EINVAL => ("This should never happen! {msg} since an internal argument was invalid."),
Errno::EINTR => (Interrupt, "{msg} since an interrupt signal was received."),
Errno::ECONNRESET => (ConnectionReset, "{msg} since the connection was reset."),
Errno::EPIPE => (Disconnected, "{msg} since the socket is no longer connected."),
Errno::ENOBUFS => (InsufficientResources, "{msg} due to insufficient resources."),
v => (UnknownError(v as i32), "{msg} since an unknown error occurred ({v}).")
)
Expand Down
85 changes: 85 additions & 0 deletions iceoryx2-bb/posix/tests/socket_pair_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,88 @@ fn peeking_message_does_not_remove_message() {
assert_that!(result.unwrap(), eq send_data.len());
assert_that!(send_data, eq received_data);
}

#[test]
fn send_from_duplicated_socket_works() {
let _watchdog = Watchdog::new();

let (sut_lhs, sut_rhs) = StreamingSocket::create_pair().unwrap();
let sut_lhs_dup = sut_lhs.duplicate().unwrap();

let send_data = Vec::from(b"!hello hypnotoad!");

let result = sut_lhs_dup.try_send(&send_data);
assert_that!(result, is_ok);
assert_that!(result.unwrap(), eq send_data.len());

let mut received_data = vec![];
received_data.resize(send_data.len(), 0);
let result = sut_rhs.try_receive(&mut received_data);
assert_that!(result, is_ok);
assert_that!(result.unwrap(), eq send_data.len());
assert_that!(send_data, eq received_data);
}

#[test]
fn receive_from_duplicated_socket_works() {
let _watchdog = Watchdog::new();

let (sut_lhs, sut_rhs) = StreamingSocket::create_pair().unwrap();
let sut_rhs_dup = sut_rhs.duplicate().unwrap();

let send_data = Vec::from(b"!hello hypnotoad!");

let result = sut_lhs.try_send(&send_data);
assert_that!(result, is_ok);
assert_that!(result.unwrap(), eq send_data.len());

let mut received_data = vec![];
received_data.resize(send_data.len(), 0);
let result = sut_rhs_dup.try_receive(&mut received_data);
assert_that!(result, is_ok);
assert_that!(result.unwrap(), eq send_data.len());
assert_that!(send_data, eq received_data);
}

#[test]
fn multiple_duplicated_sockets_can_send() {
let _watchdog = Watchdog::new();

let (sut_lhs, sut_rhs) = StreamingSocket::create_pair().unwrap();
let sut_lhs_dup_1 = sut_lhs.duplicate().unwrap();
let sut_lhs_dup_2 = sut_lhs.duplicate().unwrap();

let send_data_1 = Vec::from(b"!1!");
let send_data_2 = Vec::from(b"!2!");
let send_data_3 = Vec::from(b"!3!");

let result = sut_lhs.try_send(&send_data_1);
assert_that!(result, is_ok);
assert_that!(result.unwrap(), eq send_data_1.len());

let result = sut_lhs_dup_1.try_send(&send_data_2);
assert_that!(result, is_ok);
assert_that!(result.unwrap(), eq send_data_2.len());

let result = sut_lhs_dup_2.try_send(&send_data_3);
assert_that!(result, is_ok);
assert_that!(result.unwrap(), eq send_data_3.len());

let mut received_data = vec![];
received_data.resize(send_data_1.len(), 0);

let result = sut_rhs.try_receive(&mut received_data);
assert_that!(result, is_ok);
assert_that!(result.unwrap(), eq send_data_1.len());
assert_that!(send_data_1, eq received_data);

let result = sut_rhs.try_receive(&mut received_data);
assert_that!(result, is_ok);
assert_that!(result.unwrap(), eq send_data_2.len());
assert_that!(send_data_2, eq received_data);

let result = sut_rhs.try_receive(&mut received_data);
assert_that!(result, is_ok);
assert_that!(result.unwrap(), eq send_data_3.len());
assert_that!(send_data_3, eq received_data);
}
24 changes: 13 additions & 11 deletions iceoryx2-cal/src/dynamic_storage/process_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ extern crate alloc;
use alloc::sync::Arc;

pub use crate::dynamic_storage::*;
use crate::named_concept::{
NamedConceptDoesExistError, NamedConceptListError, NamedConceptRemoveError,
};
use crate::static_storage::file::NamedConceptConfiguration;

use self::dynamic_storage_configuration::DynamicStorageConfiguration;
Expand Down Expand Up @@ -206,12 +209,12 @@ impl<T: Send + Sync + Debug + 'static> NamedConceptMgmt for Storage<T> {
fn does_exist_cfg(
name: &FileName,
config: &Self::Configuration,
) -> Result<bool, crate::static_storage::file::NamedConceptDoesExistError> {
) -> Result<bool, NamedConceptDoesExistError> {
let msg = "Unable to check if dynamic_storage::process_local exists";
let origin = "dynamic_storage::process_local::Storage::does_exist_cfg()";

let guard = fatal_panic!(from origin,
when PROCESS_LOCAL_STORAGE.lock(),
let guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(),
with NamedConceptDoesExistError::InternalError,
"{} since the lock could not be acquired.", msg);

match guard.get(&config.path_for(name)) {
Expand All @@ -220,14 +223,12 @@ impl<T: Send + Sync + Debug + 'static> NamedConceptMgmt for Storage<T> {
}
}

fn list_cfg(
config: &Self::Configuration,
) -> Result<Vec<FileName>, crate::static_storage::file::NamedConceptListError> {
fn list_cfg(config: &Self::Configuration) -> Result<Vec<FileName>, NamedConceptListError> {
let msg = "Unable to list all dynamic_storage::process_local";
let origin = "dynamic_storage::process_local::Storage::list_cfg()";

let guard = fatal_panic!(from origin,
when PROCESS_LOCAL_STORAGE.lock(),
let guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(),
with NamedConceptListError::InternalError,
"{} since the lock could not be acquired.", msg);

let mut result = vec![];
Expand All @@ -243,14 +244,15 @@ impl<T: Send + Sync + Debug + 'static> NamedConceptMgmt for Storage<T> {
unsafe fn remove_cfg(
name: &FileName,
cfg: &Self::Configuration,
) -> Result<bool, crate::static_storage::file::NamedConceptRemoveError> {
) -> Result<bool, NamedConceptRemoveError> {
let storage_name = cfg.path_for(name);

let msg = "Unable to remove dynamic_storage::process_local";
let origin = "dynamic_storage::process_local::Storage::remove_cfg()";

let mut guard = fatal_panic!(from origin, when PROCESS_LOCAL_STORAGE.lock()
, "{} since the lock could not be acquired.", msg);
let mut guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(),
with NamedConceptRemoveError::InternalError,
"{} since the lock could not be acquired.", msg);

let mut entry = guard.get_mut(&storage_name);
if entry.is_none() {
Expand Down
3 changes: 3 additions & 0 deletions iceoryx2-cal/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

pub mod common;
pub mod id_tracker;
pub mod process_local_socketpair;
pub mod sem_bitset_posix_shared_memory;
pub mod sem_bitset_process_local;
pub mod signal_mechanism;
Expand All @@ -25,6 +26,7 @@ pub use iceoryx2_bb_system_types::path::Path;

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum NotifierNotifyError {
Interrupt,
FailedToDeliverSignal,
TriggerIdOutOfBounds,
Disconnected,
Expand All @@ -41,6 +43,7 @@ impl std::error::Error for NotifierNotifyError {}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum NotifierCreateError {
Interrupt,
DoesNotExist,
InsufficientPermissions,
VersionMismatch,
Expand Down
Loading

0 comments on commit 3c86695

Please sign in to comment.