From 68b9108e5ece7f674db25ad8cb3eb3f56f980f10 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 20 Jan 2025 17:46:25 +0100 Subject: [PATCH 1/9] [#508] Create socketpair event skeleton --- iceoryx2-cal/src/event/mod.rs | 1 + .../src/event/process_local_socketpair.rs | 279 ++++++++++++++++++ 2 files changed, 280 insertions(+) create mode 100644 iceoryx2-cal/src/event/process_local_socketpair.rs diff --git a/iceoryx2-cal/src/event/mod.rs b/iceoryx2-cal/src/event/mod.rs index 6421c1d16..3b7f7d52e 100644 --- a/iceoryx2-cal/src/event/mod.rs +++ b/iceoryx2-cal/src/event/mod.rs @@ -12,6 +12,7 @@ pub mod common; pub mod id_tracker; +pub mod process_local_socketpair; pub mod sem_bitset_posix_shared_memory; pub mod sem_bitset_process_local; pub mod signal_mechanism; diff --git a/iceoryx2-cal/src/event/process_local_socketpair.rs b/iceoryx2-cal/src/event/process_local_socketpair.rs new file mode 100644 index 000000000..af6e49a85 --- /dev/null +++ b/iceoryx2-cal/src/event/process_local_socketpair.rs @@ -0,0 +1,279 @@ +// Copyright (c) 2025 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +use std::{collections::HashMap, time::Duration}; + +pub use iceoryx2_bb_container::semantic_string::SemanticString; +use iceoryx2_bb_log::fatal_panic; +use iceoryx2_bb_posix::{ + file_descriptor::FileDescriptorBased, + file_descriptor_set::SynchronousMultiplexing, + mutex::{Handle, Mutex, MutexBuilder, MutexHandle}, +}; +pub use iceoryx2_bb_system_types::{file_name::FileName, file_path::FilePath, path::Path}; +use once_cell::sync::Lazy; + +use crate::named_concept::NamedConceptConfiguration; + +use super::{ + ListenerCreateError, ListenerWaitError, NamedConcept, NamedConceptBuilder, NamedConceptMgmt, + NotifierCreateError, NotifierNotifyError, TriggerId, +}; + +#[derive(Debug)] +struct StorageEntry {} + +static PROCESS_LOCAL_MTX_HANDLE: Lazy>> = + Lazy::new(MutexHandle::new); +static PROCESS_LOCAL_STORAGE: Lazy>> = Lazy::new(|| { + let result = MutexBuilder::new() + .is_interprocess_capable(false) + .create(HashMap::new(), &PROCESS_LOCAL_MTX_HANDLE); + + if result.is_err() { + fatal_panic!(from "PROCESS_LOCAL_STORAGE", "Failed to create global dynamic storage"); + } + + result.unwrap() +}); + +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct Configuration { + suffix: FileName, + prefix: FileName, + path: Path, +} + +impl Default for Configuration { + fn default() -> Self { + Self { + path: EventImpl::default_path_hint(), + suffix: EventImpl::default_suffix(), + prefix: EventImpl::default_prefix(), + } + } +} + +impl NamedConceptConfiguration for Configuration { + fn prefix(mut self, value: &FileName) -> Self { + self.prefix = *value; + self + } + + fn get_prefix(&self) -> &FileName { + &self.prefix + } + + fn suffix(mut self, value: &FileName) -> Self { + self.suffix = *value; + self + } + + fn path_hint(mut self, value: &Path) -> Self { + self.path = *value; + self + } + + fn get_suffix(&self) -> &FileName { + &self.suffix + } + + fn get_path_hint(&self) -> &Path { + &self.path + } +} + +#[derive(Debug)] +struct EventImpl {} + +impl NamedConceptMgmt for EventImpl { + type Configuration = Configuration; + + fn does_exist_cfg( + name: &FileName, + cfg: &Self::Configuration, + ) -> Result { + todo!() + } + + fn list_cfg( + cfg: &Self::Configuration, + ) -> Result, crate::named_concept::NamedConceptListError> { + todo!() + } + + unsafe fn remove_cfg( + name: &FileName, + cfg: &Self::Configuration, + ) -> Result { + todo!() + } + + fn remove_path_hint( + value: &Path, + ) -> Result<(), crate::named_concept::NamedConceptPathHintRemoveError> { + todo!() + } +} + +impl crate::event::Event for EventImpl { + type Notifier = Notifier; + type Listener = Listener; + type NotifierBuilder = NotifierBuilder; + type ListenerBuilder = ListenerBuilder; +} + +impl EventImpl { + fn default_path_hint() -> Path { + Path::new(b"").unwrap() + } + + fn default_prefix() -> FileName { + FileName::new(b"iox2").unwrap() + } + + fn default_suffix() -> FileName { + FileName::new(b".event").unwrap() + } +} + +#[derive(Debug)] +pub struct Notifier { + name: FileName, +} + +impl NamedConcept for Notifier { + fn name(&self) -> &FileName { + &self.name + } +} + +impl crate::event::Notifier for Notifier { + fn notify(&self, id: TriggerId) -> Result<(), NotifierNotifyError> { + todo!() + } +} + +#[derive(Debug)] +pub struct NotifierBuilder { + name: FileName, + config: Configuration, +} + +impl NamedConceptBuilder for NotifierBuilder { + fn new(name: &FileName) -> Self { + Self { + name: *name, + config: Configuration::default(), + } + } + + fn config(mut self, config: &Configuration) -> Self { + self.config = *config; + self + } +} + +impl crate::event::NotifierBuilder for NotifierBuilder { + fn timeout(self, _timeout: Duration) -> Self { + todo!() + } + + fn open(self) -> Result { + todo!() + } +} + +#[derive(Debug)] +pub struct Listener { + name: FileName, +} + +impl FileDescriptorBased for Listener { + fn file_descriptor(&self) -> &iceoryx2_bb_posix::file_descriptor::FileDescriptor { + todo!() + } +} + +impl SynchronousMultiplexing for Listener {} + +impl NamedConcept for Listener { + fn name(&self) -> &FileName { + &self.name + } +} + +impl crate::event::Listener for Listener { + fn try_wait_one(&self) -> Result, ListenerWaitError> { + todo!() + } + + fn timed_wait_one( + &self, + timeout: core::time::Duration, + ) -> Result, ListenerWaitError> { + todo!() + } + + fn blocking_wait_one(&self) -> Result, ListenerWaitError> { + todo!() + } + + fn try_wait_all(&self, mut callback: F) -> Result<(), ListenerWaitError> { + todo!() + } + + fn timed_wait_all( + &self, + mut callback: F, + timeout: Duration, + ) -> Result<(), ListenerWaitError> { + todo!() + } + + fn blocking_wait_all( + &self, + mut callback: F, + ) -> Result<(), ListenerWaitError> { + todo!() + } +} + +#[derive(Debug)] +pub struct ListenerBuilder { + name: FileName, + config: Configuration, +} + +impl NamedConceptBuilder for ListenerBuilder { + fn new(name: &FileName) -> Self { + Self { + name: *name, + config: Configuration::default(), + } + } + + fn config(mut self, config: &::Configuration) -> Self { + self.config = *config; + self + } +} + +impl crate::event::ListenerBuilder for ListenerBuilder { + fn trigger_id_max(self, _id: TriggerId) -> Self { + self + } + + fn create(self) -> Result { + todo!() + } +} From 85154d8dd81957b7bf0ea9388f51f9f410f18159 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 20 Jan 2025 18:40:20 +0100 Subject: [PATCH 2/9] [#508] Implement listener --- .../src/event/process_local_socketpair.rs | 173 ++++++++++++++++-- iceoryx2-cal/tests/event_tests.rs | 3 + 2 files changed, 156 insertions(+), 20 deletions(-) diff --git a/iceoryx2-cal/src/event/process_local_socketpair.rs b/iceoryx2-cal/src/event/process_local_socketpair.rs index af6e49a85..ef189c2ed 100644 --- a/iceoryx2-cal/src/event/process_local_socketpair.rs +++ b/iceoryx2-cal/src/event/process_local_socketpair.rs @@ -10,14 +10,17 @@ // // SPDX-License-Identifier: Apache-2.0 OR MIT -use std::{collections::HashMap, time::Duration}; +use std::{char::MAX, collections::HashMap, time::Duration}; pub use iceoryx2_bb_container::semantic_string::SemanticString; -use iceoryx2_bb_log::fatal_panic; +use iceoryx2_bb_log::{fail, fatal_panic}; use iceoryx2_bb_posix::{ file_descriptor::FileDescriptorBased, file_descriptor_set::SynchronousMultiplexing, mutex::{Handle, Mutex, MutexBuilder, MutexHandle}, + socket_pair::{ + StreamingSocket, StreamingSocketPairCreationError, StreamingSocketPairReceiveError, + }, }; pub use iceoryx2_bb_system_types::{file_name::FileName, file_path::FilePath, path::Path}; use once_cell::sync::Lazy; @@ -29,8 +32,12 @@ use super::{ NotifierCreateError, NotifierNotifyError, TriggerId, }; +const MAX_BATCH_SIZE: usize = 512; + #[derive(Debug)] -struct StorageEntry {} +struct StorageEntry { + notifier: StreamingSocket, +} static PROCESS_LOCAL_MTX_HANDLE: Lazy>> = Lazy::new(MutexHandle::new); @@ -93,7 +100,7 @@ impl NamedConceptConfiguration for Configuration { } #[derive(Debug)] -struct EventImpl {} +pub struct EventImpl {} impl NamedConceptMgmt for EventImpl { type Configuration = Configuration; @@ -185,22 +192,36 @@ impl NamedConceptBuilder for NotifierBuilder { impl crate::event::NotifierBuilder for NotifierBuilder { fn timeout(self, _timeout: Duration) -> Self { - todo!() + self } fn open(self) -> Result { - todo!() + let msg = "Failed to open Notifier"; + let full_path = self.config.path_for(&self.name); + + let guard = fail!(from self, when PROCESS_LOCAL_STORAGE.lock(), + with NotifierCreateError::InternalFailure, + "{msg} due to a failure while acquiring the lock."); + + match guard.get(&full_path) { + Some(entry) => todo!(), + None => { + fail!(from self, with NotifierCreateError::DoesNotExist, + "{msg} since the event does not exist."); + } + }; } } #[derive(Debug)] pub struct Listener { name: FileName, + socket: StreamingSocket, } impl FileDescriptorBased for Listener { fn file_descriptor(&self) -> &iceoryx2_bb_posix::file_descriptor::FileDescriptor { - todo!() + self.socket.file_descriptor() } } @@ -212,39 +233,122 @@ impl NamedConcept for Listener { } } +impl Listener { + fn wait_one_impl< + WaitCall: FnMut(&mut [u8]) -> Result, + >( + &self, + mut waitcall: WaitCall, + msg: &str, + ) -> Result, ListenerWaitError> { + let trigger_id_size = core::mem::size_of::(); + let mut trigger_id: usize = 0; + let raw_trigger_id = unsafe { + core::slice::from_raw_parts_mut( + ((&mut trigger_id) as *mut usize) as *mut u8, + trigger_id_size, + ) + }; + + match waitcall(raw_trigger_id) { + Ok(number_of_bytes) => { + if number_of_bytes == 0 { + return Ok(None); + } else if number_of_bytes == trigger_id_size { + return Ok(Some(TriggerId::new(trigger_id))); + } else { + fail!(from self, with ListenerWaitError::ContractViolation, + "{msg} due to a contract violation. Expected to receive {} bytes but got {} bytes.", + trigger_id_size, number_of_bytes); + } + } + Err(StreamingSocketPairReceiveError::Interrupt) => { + fail!(from self, with ListenerWaitError::InterruptSignal, + "{msg} since an interrupt signal was received."); + } + Err(e) => { + fail!(from self, with ListenerWaitError::InternalFailure, + "{msg} due to an internal failure while receiving data on the underlying streaming socket ({:?}).", e); + } + } + } + + fn wait_all_impl< + WaitCall: FnMut(&mut [u8]) -> Result, + F: FnMut(TriggerId), + >( + &self, + mut callback: F, + waitcall: WaitCall, + msg: &str, + ) -> Result<(), ListenerWaitError> { + match self.wait_one_impl(waitcall, msg)? { + None => return Ok(()), + Some(trigger_id) => callback(trigger_id), + } + + for _ in 0..MAX_BATCH_SIZE { + match self.wait_one_impl(|buffer| self.socket.try_receive(buffer), msg)? { + None => return Ok(()), + Some(trigger_id) => callback(trigger_id), + } + } + + Ok(()) + } +} + impl crate::event::Listener for Listener { fn try_wait_one(&self) -> Result, ListenerWaitError> { - todo!() + self.wait_one_impl( + |buffer| self.socket.try_receive(buffer), + "Unable to try to receive a TriggerId", + ) } fn timed_wait_one( &self, timeout: core::time::Duration, ) -> Result, ListenerWaitError> { - todo!() + self.wait_one_impl( + |buffer| self.socket.timed_receive(buffer, timeout), + "Unable to receive a TriggerId with a timeout", + ) } fn blocking_wait_one(&self) -> Result, ListenerWaitError> { - todo!() + self.wait_one_impl( + |buffer| self.socket.blocking_receive(buffer), + "Unable to block until a TriggerId is received", + ) } - fn try_wait_all(&self, mut callback: F) -> Result<(), ListenerWaitError> { - todo!() + fn try_wait_all(&self, callback: F) -> Result<(), ListenerWaitError> { + self.wait_all_impl( + callback, + |buffer| self.socket.try_receive(buffer), + "Unable to try to receive all TriggerIds", + ) } fn timed_wait_all( &self, - mut callback: F, + callback: F, timeout: Duration, ) -> Result<(), ListenerWaitError> { - todo!() + self.wait_all_impl( + callback, + |buffer| self.socket.timed_receive(buffer, timeout), + "Unable to receive all TriggerIds with a timeout", + ) } - fn blocking_wait_all( - &self, - mut callback: F, - ) -> Result<(), ListenerWaitError> { - todo!() + fn blocking_wait_all(&self, callback: F) -> Result<(), ListenerWaitError> { + self.wait_all_impl( + callback, + |buffer| self.socket.blocking_receive(buffer), + "Unable to block until all TriggerIds are received", + ) } } @@ -274,6 +378,35 @@ impl crate::event::ListenerBuilder for ListenerBuilder { } fn create(self) -> Result { - todo!() + let msg = "Failed to create Listener"; + let full_path = self.config.path_for(&self.name); + + let mut guard = fail!(from self, when PROCESS_LOCAL_STORAGE.lock(), + with ListenerCreateError::InternalFailure, + "{msg} due to a failure while acquiring the lock."); + let entry = guard.get_mut(&full_path); + if entry.is_some() { + fail!(from self, with ListenerCreateError::AlreadyExists, + "{msg} since the event already exists."); + } + + let (notifier, listener) = match StreamingSocket::create_pair() { + Ok((notifier, listener)) => (notifier, listener), + Err(StreamingSocketPairCreationError::InsufficientPermissions) => { + fail!(from self, with ListenerCreateError::InsufficientPermissions, + "{msg} due to insufficient permissions to create a socket pair."); + } + Err(e) => { + fail!(from self, with ListenerCreateError::InternalFailure, + "{msg} due to an internal error while creating the socket pair ({:?}).", e); + } + }; + + guard.insert(full_path, StorageEntry { notifier }); + + Ok(Listener { + name: self.name, + socket: listener, + }) } } diff --git a/iceoryx2-cal/tests/event_tests.rs b/iceoryx2-cal/tests/event_tests.rs index e69fd94de..1a5570e1b 100644 --- a/iceoryx2-cal/tests/event_tests.rs +++ b/iceoryx2-cal/tests/event_tests.rs @@ -734,6 +734,9 @@ mod event { } } + #[instantiate_tests()] + mod process_local_socket_pair {} + #[instantiate_tests()] mod unix_datagram {} From 37e20b1be0ecc483cd300738b3c36e7648d7cc2a Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 20 Jan 2025 19:02:23 +0100 Subject: [PATCH 3/9] [#508] StreamingSockets can be duplicated --- iceoryx2-bb/posix/src/socket_pair.rs | 55 +++++++++---- iceoryx2-bb/posix/tests/socket_pair_tests.rs | 85 ++++++++++++++++++++ 2 files changed, 126 insertions(+), 14 deletions(-) diff --git a/iceoryx2-bb/posix/src/socket_pair.rs b/iceoryx2-bb/posix/src/socket_pair.rs index abb822291..59b4fc6af 100644 --- a/iceoryx2-bb/posix/src/socket_pair.rs +++ b/iceoryx2-bb/posix/src/socket_pair.rs @@ -166,11 +166,26 @@ impl SynchronousMultiplexing for StreamingSocket {} unsafe impl Send for StreamingSocket {} impl StreamingSocket { + fn create_type_safe_fd( + raw_fd: i32, + origin: &str, + msg: &str, + ) -> Result { + match FileDescriptor::new(raw_fd) { + Some(fd) => Ok(fd), + None => { + fail!(from origin, + with StreamingSocketPairCreationError::FileDescriptorBroken, + "This should never happen! {msg} since the socketpair implementation returned a broken file descriptor."); + } + } + } + /// Creates a new [`StreamingSocket`] pair. pub fn create_pair( ) -> Result<(StreamingSocket, StreamingSocket), StreamingSocketPairCreationError> { let msg = "Unable to create streaming socket pair"; - let origin = "StreamingSocketPairBuilder::create()"; + let origin = "StreamingSocket::create_pair()"; let mut fd_values = [0, 0]; if unsafe { @@ -182,19 +197,8 @@ impl StreamingSocket { ) } == 0 { - let create_fd = |fd| -> Result { - match FileDescriptor::new(fd) { - Some(fd) => Ok(fd), - None => { - fail!(from origin, - with StreamingSocketPairCreationError::FileDescriptorBroken, - "This should never happen! {msg} since the socketpair implementation returned a broken file descriptor."); - } - } - }; - - let fd_1 = create_fd(fd_values[0])?; - let fd_2 = create_fd(fd_values[1])?; + let fd_1 = Self::create_type_safe_fd(fd_values[0], origin, msg)?; + let fd_2 = Self::create_type_safe_fd(fd_values[1], origin, msg)?; let socket_1 = StreamingSocket { file_descriptor: fd_1, is_non_blocking: IoxAtomicBool::new(false), @@ -220,6 +224,29 @@ impl StreamingSocket { ) } + /// Duplicates a [`StreamingSocket`]. It is connected to all existing sockets. + pub fn duplicate(&self) -> Result { + let origin = "StreamingSocket::duplicate()"; + let msg = "Unable to duplicate StreamingSocket"; + let duplicated_fd = unsafe { posix::dup(self.file_descriptor.native_handle()) }; + if duplicated_fd != -1 { + let new_socket = StreamingSocket { + file_descriptor: Self::create_type_safe_fd(duplicated_fd, origin, msg)?, + is_non_blocking: IoxAtomicBool::new(false), + }; + + fail!(from origin, when new_socket.set_non_blocking(true), + "{msg} since the duplicated streaming socket could not be set to non-blocking."); + + return Ok(new_socket); + } + + handle_errno!(StreamingSocketPairCreationError, from origin, + Errno::EMFILE => (PerProcessFileHandleLimitReached, "{msg} since the processes file descriptor limit was reached."), + v => (UnknownError(v as i32), "{msg} since an unknown error occurred ({v}).") + ) + } + fn fcntl(&self, command: i32, value: i32, msg: &str) -> Result { let result = unsafe { posix::fcntl_int(self.file_descriptor.native_handle(), command, value) }; diff --git a/iceoryx2-bb/posix/tests/socket_pair_tests.rs b/iceoryx2-bb/posix/tests/socket_pair_tests.rs index b616fe05b..e70cb5f1c 100644 --- a/iceoryx2-bb/posix/tests/socket_pair_tests.rs +++ b/iceoryx2-bb/posix/tests/socket_pair_tests.rs @@ -330,3 +330,88 @@ fn peeking_message_does_not_remove_message() { assert_that!(result.unwrap(), eq send_data.len()); assert_that!(send_data, eq received_data); } + +#[test] +fn send_from_duplicated_socket_works() { + let _watchdog = Watchdog::new(); + + let (sut_lhs, sut_rhs) = StreamingSocket::create_pair().unwrap(); + let sut_lhs_dup = sut_lhs.duplicate().unwrap(); + + let send_data = Vec::from(b"!hello hypnotoad!"); + + let result = sut_lhs_dup.try_send(&send_data); + assert_that!(result, is_ok); + assert_that!(result.unwrap(), eq send_data.len()); + + let mut received_data = vec![]; + received_data.resize(send_data.len(), 0); + let result = sut_rhs.try_receive(&mut received_data); + assert_that!(result, is_ok); + assert_that!(result.unwrap(), eq send_data.len()); + assert_that!(send_data, eq received_data); +} + +#[test] +fn receive_from_duplicated_socket_works() { + let _watchdog = Watchdog::new(); + + let (sut_lhs, sut_rhs) = StreamingSocket::create_pair().unwrap(); + let sut_rhs_dup = sut_rhs.duplicate().unwrap(); + + let send_data = Vec::from(b"!hello hypnotoad!"); + + let result = sut_lhs.try_send(&send_data); + assert_that!(result, is_ok); + assert_that!(result.unwrap(), eq send_data.len()); + + let mut received_data = vec![]; + received_data.resize(send_data.len(), 0); + let result = sut_rhs_dup.try_receive(&mut received_data); + assert_that!(result, is_ok); + assert_that!(result.unwrap(), eq send_data.len()); + assert_that!(send_data, eq received_data); +} + +#[test] +fn multiple_duplicated_sockets_can_send() { + let _watchdog = Watchdog::new(); + + let (sut_lhs, sut_rhs) = StreamingSocket::create_pair().unwrap(); + let sut_lhs_dup_1 = sut_lhs.duplicate().unwrap(); + let sut_lhs_dup_2 = sut_lhs.duplicate().unwrap(); + + let send_data_1 = Vec::from(b"!1!"); + let send_data_2 = Vec::from(b"!2!"); + let send_data_3 = Vec::from(b"!3!"); + + let result = sut_lhs.try_send(&send_data_1); + assert_that!(result, is_ok); + assert_that!(result.unwrap(), eq send_data_1.len()); + + let result = sut_lhs_dup_1.try_send(&send_data_2); + assert_that!(result, is_ok); + assert_that!(result.unwrap(), eq send_data_2.len()); + + let result = sut_lhs_dup_2.try_send(&send_data_3); + assert_that!(result, is_ok); + assert_that!(result.unwrap(), eq send_data_3.len()); + + let mut received_data = vec![]; + received_data.resize(send_data_1.len(), 0); + + let result = sut_rhs.try_receive(&mut received_data); + assert_that!(result, is_ok); + assert_that!(result.unwrap(), eq send_data_1.len()); + assert_that!(send_data_1, eq received_data); + + let result = sut_rhs.try_receive(&mut received_data); + assert_that!(result, is_ok); + assert_that!(result.unwrap(), eq send_data_2.len()); + assert_that!(send_data_2, eq received_data); + + let result = sut_rhs.try_receive(&mut received_data); + assert_that!(result, is_ok); + assert_that!(result.unwrap(), eq send_data_3.len()); + assert_that!(send_data_3, eq received_data); +} From 65295716669e86cced923a83356dab64cb0ba965 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 20 Jan 2025 19:31:44 +0100 Subject: [PATCH 4/9] [#508] Implement event concept socket pair management functions --- iceoryx2-bb/posix/src/socket_pair.rs | 26 +++- .../src/dynamic_storage/process_local.rs | 24 ++-- iceoryx2-cal/src/event/mod.rs | 2 + .../src/event/process_local_socketpair.rs | 124 +++++++++++++++--- 4 files changed, 142 insertions(+), 34 deletions(-) diff --git a/iceoryx2-bb/posix/src/socket_pair.rs b/iceoryx2-bb/posix/src/socket_pair.rs index 59b4fc6af..3376fd18d 100644 --- a/iceoryx2-bb/posix/src/socket_pair.rs +++ b/iceoryx2-bb/posix/src/socket_pair.rs @@ -41,6 +41,25 @@ use crate::{ const BLOCKING_TIMEOUT: Duration = Duration::from_secs(i16::MAX as _); +/// Defines the errors that can occur when a socket pair is created with +/// [`StreamingSocket::create_pair()`]. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum StreamingSocketDuplicateError { + PerProcessFileHandleLimitReached, + Interrupt, + FileDescriptorBroken, + UnknownError(i32), +} + +impl From for StreamingSocketDuplicateError { + fn from(value: FcntlError) -> Self { + match value { + FcntlError::Interrupt => StreamingSocketDuplicateError::Interrupt, + FcntlError::UnknownError(v) => StreamingSocketDuplicateError::UnknownError(v), + } + } +} + /// Defines the errors that can occur when a socket pair is created with /// [`StreamingSocket::create_pair()`]. #[derive(Debug, Clone, Copy, Eq, PartialEq)] @@ -225,13 +244,14 @@ impl StreamingSocket { } /// Duplicates a [`StreamingSocket`]. It is connected to all existing sockets. - pub fn duplicate(&self) -> Result { + pub fn duplicate(&self) -> Result { let origin = "StreamingSocket::duplicate()"; let msg = "Unable to duplicate StreamingSocket"; let duplicated_fd = unsafe { posix::dup(self.file_descriptor.native_handle()) }; if duplicated_fd != -1 { let new_socket = StreamingSocket { - file_descriptor: Self::create_type_safe_fd(duplicated_fd, origin, msg)?, + file_descriptor: Self::create_type_safe_fd(duplicated_fd, origin, msg) + .map_err(|_| StreamingSocketDuplicateError::FileDescriptorBroken)?, is_non_blocking: IoxAtomicBool::new(false), }; @@ -241,7 +261,7 @@ impl StreamingSocket { return Ok(new_socket); } - handle_errno!(StreamingSocketPairCreationError, from origin, + handle_errno!(StreamingSocketDuplicateError, from origin, Errno::EMFILE => (PerProcessFileHandleLimitReached, "{msg} since the processes file descriptor limit was reached."), v => (UnknownError(v as i32), "{msg} since an unknown error occurred ({v}).") ) diff --git a/iceoryx2-cal/src/dynamic_storage/process_local.rs b/iceoryx2-cal/src/dynamic_storage/process_local.rs index e030cb91f..bece83dd8 100644 --- a/iceoryx2-cal/src/dynamic_storage/process_local.rs +++ b/iceoryx2-cal/src/dynamic_storage/process_local.rs @@ -58,6 +58,9 @@ extern crate alloc; use alloc::sync::Arc; pub use crate::dynamic_storage::*; +use crate::named_concept::{ + NamedConceptDoesExistError, NamedConceptListError, NamedConceptRemoveError, +}; use crate::static_storage::file::NamedConceptConfiguration; use self::dynamic_storage_configuration::DynamicStorageConfiguration; @@ -206,12 +209,12 @@ impl NamedConceptMgmt for Storage { fn does_exist_cfg( name: &FileName, config: &Self::Configuration, - ) -> Result { + ) -> Result { let msg = "Unable to check if dynamic_storage::process_local exists"; let origin = "dynamic_storage::process_local::Storage::does_exist_cfg()"; - let guard = fatal_panic!(from origin, - when PROCESS_LOCAL_STORAGE.lock(), + let guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(), + with NamedConceptDoesExistError::InternalError, "{} since the lock could not be acquired.", msg); match guard.get(&config.path_for(name)) { @@ -220,14 +223,12 @@ impl NamedConceptMgmt for Storage { } } - fn list_cfg( - config: &Self::Configuration, - ) -> Result, crate::static_storage::file::NamedConceptListError> { + fn list_cfg(config: &Self::Configuration) -> Result, NamedConceptListError> { let msg = "Unable to list all dynamic_storage::process_local"; let origin = "dynamic_storage::process_local::Storage::list_cfg()"; - let guard = fatal_panic!(from origin, - when PROCESS_LOCAL_STORAGE.lock(), + let guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(), + with NamedConceptListError::InternalError, "{} since the lock could not be acquired.", msg); let mut result = vec![]; @@ -243,14 +244,15 @@ impl NamedConceptMgmt for Storage { unsafe fn remove_cfg( name: &FileName, cfg: &Self::Configuration, - ) -> Result { + ) -> Result { let storage_name = cfg.path_for(name); let msg = "Unable to remove dynamic_storage::process_local"; let origin = "dynamic_storage::process_local::Storage::remove_cfg()"; - let mut guard = fatal_panic!(from origin, when PROCESS_LOCAL_STORAGE.lock() - , "{} since the lock could not be acquired.", msg); + let mut guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(), + with NamedConceptRemoveError::InternalError, + "{} since the lock could not be acquired.", msg); let mut entry = guard.get_mut(&storage_name); if entry.is_none() { diff --git a/iceoryx2-cal/src/event/mod.rs b/iceoryx2-cal/src/event/mod.rs index 3b7f7d52e..e1366cda1 100644 --- a/iceoryx2-cal/src/event/mod.rs +++ b/iceoryx2-cal/src/event/mod.rs @@ -26,6 +26,7 @@ pub use iceoryx2_bb_system_types::path::Path; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum NotifierNotifyError { + Interrupt, FailedToDeliverSignal, TriggerIdOutOfBounds, Disconnected, @@ -42,6 +43,7 @@ impl std::error::Error for NotifierNotifyError {} #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum NotifierCreateError { + Interrupt, DoesNotExist, InsufficientPermissions, VersionMismatch, diff --git a/iceoryx2-cal/src/event/process_local_socketpair.rs b/iceoryx2-cal/src/event/process_local_socketpair.rs index ef189c2ed..677cd035f 100644 --- a/iceoryx2-cal/src/event/process_local_socketpair.rs +++ b/iceoryx2-cal/src/event/process_local_socketpair.rs @@ -10,7 +10,7 @@ // // SPDX-License-Identifier: Apache-2.0 OR MIT -use std::{char::MAX, collections::HashMap, time::Duration}; +use std::{collections::HashMap, time::Duration}; pub use iceoryx2_bb_container::semantic_string::SemanticString; use iceoryx2_bb_log::{fail, fatal_panic}; @@ -19,13 +19,17 @@ use iceoryx2_bb_posix::{ file_descriptor_set::SynchronousMultiplexing, mutex::{Handle, Mutex, MutexBuilder, MutexHandle}, socket_pair::{ - StreamingSocket, StreamingSocketPairCreationError, StreamingSocketPairReceiveError, + StreamingSocket, StreamingSocketDuplicateError, StreamingSocketPairCreationError, + StreamingSocketPairReceiveError, StreamingSocketPairSendError, }, }; pub use iceoryx2_bb_system_types::{file_name::FileName, file_path::FilePath, path::Path}; use once_cell::sync::Lazy; -use crate::named_concept::NamedConceptConfiguration; +use crate::named_concept::{ + NamedConceptConfiguration, NamedConceptDoesExistError, NamedConceptListError, + NamedConceptRemoveError, +}; use super::{ ListenerCreateError, ListenerWaitError, NamedConcept, NamedConceptBuilder, NamedConceptMgmt, @@ -108,27 +112,63 @@ impl NamedConceptMgmt for EventImpl { fn does_exist_cfg( name: &FileName, cfg: &Self::Configuration, - ) -> Result { - todo!() + ) -> Result { + let msg = "Unable to check if event::process_local_socketpair exists"; + let origin = "event::process_local_socketpair::Event::does_exist_cfg()"; + + let guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(), + with NamedConceptDoesExistError::InternalError, + "{} since the lock could not be acquired.", msg); + + match guard.get(&cfg.path_for(name)) { + Some(_) => Ok(true), + None => Ok(false), + } } - fn list_cfg( - cfg: &Self::Configuration, - ) -> Result, crate::named_concept::NamedConceptListError> { - todo!() + fn list_cfg(cfg: &Self::Configuration) -> Result, NamedConceptListError> { + let msg = "Unable to list all event::process_local_socketpairs"; + let origin = "event::process_local_socketpair::Event::list_cfg()"; + + let guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(), + with NamedConceptListError::InternalError, + "{} since the lock could not be acquired.", msg); + + let mut result = vec![]; + for storage_name in guard.keys() { + if let Some(v) = cfg.extract_name_from_path(storage_name) { + result.push(v); + } + } + + Ok(result) } unsafe fn remove_cfg( name: &FileName, cfg: &Self::Configuration, - ) -> Result { - todo!() + ) -> Result { + let storage_name = cfg.path_for(name); + + let msg = "Unable to remove dynamic_storage::process_local"; + let origin = "dynamic_storage::process_local::Storage::remove_cfg()"; + + let mut guard = fail!(from origin, when PROCESS_LOCAL_STORAGE.lock(), + with NamedConceptRemoveError::InternalError, + "{} since the lock could not be acquired.", msg); + + let entry = guard.get_mut(&storage_name); + if entry.is_none() { + return Ok(false); + } + + Ok(guard.remove(&storage_name).is_some()) } fn remove_path_hint( - value: &Path, + _value: &Path, ) -> Result<(), crate::named_concept::NamedConceptPathHintRemoveError> { - todo!() + Ok(()) } } @@ -155,6 +195,7 @@ impl EventImpl { #[derive(Debug)] pub struct Notifier { + socket: StreamingSocket, name: FileName, } @@ -166,7 +207,37 @@ impl NamedConcept for Notifier { impl crate::event::Notifier for Notifier { fn notify(&self, id: TriggerId) -> Result<(), NotifierNotifyError> { - todo!() + let msg = "Unable to send notification"; + let buffer = unsafe { + core::slice::from_raw_parts( + (&id as *const TriggerId) as *const u8, + core::mem::size_of::(), + ) + }; + match self.socket.try_send(buffer) { + Ok(number_of_bytes) => { + if number_of_bytes == 0 { + fail!(from self, with NotifierNotifyError::FailedToDeliverSignal, + "{msg} {id:?} since the listener buffer seems to be full."); + } else if number_of_bytes == core::mem::size_of::() { + Ok(()) + } else { + fatal_panic!(from self, "This should never happen! {msg} {id:?} could be sent only partially."); + } + } + Err(StreamingSocketPairSendError::Interrupt) => { + fail!(from self, with NotifierNotifyError::Interrupt, + "{msg} since an interrupt signal was received."); + } + Err(StreamingSocketPairSendError::ConnectionReset) => { + fail!(from self, with NotifierNotifyError::Disconnected, + "{msg} since the corresponding listener disconnected."); + } + Err(e) => { + fail!(from self, with NotifierNotifyError::InternalFailure, + "{msg} due to an unknown failure ({:?}).", e); + } + } } } @@ -204,12 +275,25 @@ impl crate::event::NotifierBuilder for NotifierBuilder { "{msg} due to a failure while acquiring the lock."); match guard.get(&full_path) { - Some(entry) => todo!(), + Some(entry) => match entry.notifier.duplicate() { + Ok(socket) => Ok(Notifier { + name: self.name, + socket, + }), + Err(StreamingSocketDuplicateError::Interrupt) => { + fail!(from self, with NotifierCreateError::Interrupt, + "{msg} since an interrupt signal was received."); + } + Err(e) => { + fail!(from self, with NotifierCreateError::InternalFailure, + "{msg} due to an unknown failure ({:?}).", e); + } + }, None => { fail!(from self, with NotifierCreateError::DoesNotExist, "{msg} since the event does not exist."); } - }; + } } } @@ -241,11 +325,11 @@ impl Listener { mut waitcall: WaitCall, msg: &str, ) -> Result, ListenerWaitError> { - let trigger_id_size = core::mem::size_of::(); - let mut trigger_id: usize = 0; + let trigger_id_size = core::mem::size_of::(); + let mut trigger_id = TriggerId::new(0); let raw_trigger_id = unsafe { core::slice::from_raw_parts_mut( - ((&mut trigger_id) as *mut usize) as *mut u8, + ((&mut trigger_id) as *mut TriggerId) as *mut u8, trigger_id_size, ) }; @@ -255,7 +339,7 @@ impl Listener { if number_of_bytes == 0 { return Ok(None); } else if number_of_bytes == trigger_id_size { - return Ok(Some(TriggerId::new(trigger_id))); + return Ok(Some(trigger_id)); } else { fail!(from self, with ListenerWaitError::ContractViolation, "{msg} due to a contract violation. Expected to receive {} bytes but got {} bytes.", From 25c9fe374be725aa1cd5704f9381ac7161ef8006 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 20 Jan 2025 19:37:42 +0100 Subject: [PATCH 5/9] [#508] Implement listener safeguards --- .../src/event/process_local_socketpair.rs | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/iceoryx2-cal/src/event/process_local_socketpair.rs b/iceoryx2-cal/src/event/process_local_socketpair.rs index 677cd035f..b16a8bd78 100644 --- a/iceoryx2-cal/src/event/process_local_socketpair.rs +++ b/iceoryx2-cal/src/event/process_local_socketpair.rs @@ -13,7 +13,7 @@ use std::{collections::HashMap, time::Duration}; pub use iceoryx2_bb_container::semantic_string::SemanticString; -use iceoryx2_bb_log::{fail, fatal_panic}; +use iceoryx2_bb_log::{debug, fail, fatal_panic}; use iceoryx2_bb_posix::{ file_descriptor::FileDescriptorBased, file_descriptor_set::SynchronousMultiplexing, @@ -32,8 +32,8 @@ use crate::named_concept::{ }; use super::{ - ListenerCreateError, ListenerWaitError, NamedConcept, NamedConceptBuilder, NamedConceptMgmt, - NotifierCreateError, NotifierNotifyError, TriggerId, + Event, ListenerCreateError, ListenerWaitError, NamedConcept, NamedConceptBuilder, + NamedConceptMgmt, NotifierCreateError, NotifierNotifyError, TriggerId, }; const MAX_BATCH_SIZE: usize = 512; @@ -179,20 +179,6 @@ impl crate::event::Event for EventImpl { type ListenerBuilder = ListenerBuilder; } -impl EventImpl { - fn default_path_hint() -> Path { - Path::new(b"").unwrap() - } - - fn default_prefix() -> FileName { - FileName::new(b"iox2").unwrap() - } - - fn default_suffix() -> FileName { - FileName::new(b".event").unwrap() - } -} - #[derive(Debug)] pub struct Notifier { socket: StreamingSocket, @@ -301,6 +287,15 @@ impl crate::event::NotifierBuilder for NotifierBuilder { pub struct Listener { name: FileName, socket: StreamingSocket, + config: Configuration, +} + +impl Drop for Listener { + fn drop(&mut self) { + if let Err(e) = unsafe { EventImpl::remove_cfg(&self.name, &self.config) } { + debug!(from self, "Unable to cleanup event after the Listener was dropped ({:?}).", e); + } + } } impl FileDescriptorBased for Listener { @@ -491,6 +486,7 @@ impl crate::event::ListenerBuilder for ListenerBuilder { Ok(Listener { name: self.name, socket: listener, + config: self.config, }) } } From 363ab3f6854f2d2eefcfe79b6c5c22268b89d3bf Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 20 Jan 2025 19:43:47 +0100 Subject: [PATCH 6/9] [#508] SocketPair event concept implemented --- iceoryx2-bb/posix/src/socket_pair.rs | 2 ++ iceoryx2-cal/src/event/process_local_socketpair.rs | 7 ++++--- iceoryx2-cal/tests/event_tests.rs | 3 +++ iceoryx2/src/port/notifier.rs | 3 +++ 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/iceoryx2-bb/posix/src/socket_pair.rs b/iceoryx2-bb/posix/src/socket_pair.rs index 3376fd18d..aad00c21e 100644 --- a/iceoryx2-bb/posix/src/socket_pair.rs +++ b/iceoryx2-bb/posix/src/socket_pair.rs @@ -106,6 +106,7 @@ pub enum StreamingSocketPairSendError { InsufficientResources, Interrupt, ConnectionReset, + Disconnected, UnknownError(i32), } @@ -367,6 +368,7 @@ impl StreamingSocket { fatal Errno::EINVAL => ("This should never happen! {msg} since an internal argument was invalid."), Errno::EINTR => (Interrupt, "{msg} since an interrupt signal was received."), Errno::ECONNRESET => (ConnectionReset, "{msg} since the connection was reset."), + Errno::EPIPE => (Disconnected, "{msg} since the socket is no longer connected."), Errno::ENOBUFS => (InsufficientResources, "{msg} due to insufficient resources."), v => (UnknownError(v as i32), "{msg} since an unknown error occurred ({v}).") ) diff --git a/iceoryx2-cal/src/event/process_local_socketpair.rs b/iceoryx2-cal/src/event/process_local_socketpair.rs index b16a8bd78..704169eb6 100644 --- a/iceoryx2-cal/src/event/process_local_socketpair.rs +++ b/iceoryx2-cal/src/event/process_local_socketpair.rs @@ -215,7 +215,8 @@ impl crate::event::Notifier for Notifier { fail!(from self, with NotifierNotifyError::Interrupt, "{msg} since an interrupt signal was received."); } - Err(StreamingSocketPairSendError::ConnectionReset) => { + Err(StreamingSocketPairSendError::ConnectionReset) + | Err(StreamingSocketPairSendError::Disconnected) => { fail!(from self, with NotifierNotifyError::Disconnected, "{msg} since the corresponding listener disconnected."); } @@ -332,9 +333,9 @@ impl Listener { match waitcall(raw_trigger_id) { Ok(number_of_bytes) => { if number_of_bytes == 0 { - return Ok(None); + Ok(None) } else if number_of_bytes == trigger_id_size { - return Ok(Some(trigger_id)); + Ok(Some(trigger_id)) } else { fail!(from self, with ListenerWaitError::ContractViolation, "{msg} due to a contract violation. Expected to receive {} bytes but got {} bytes.", diff --git a/iceoryx2-cal/tests/event_tests.rs b/iceoryx2-cal/tests/event_tests.rs index 1a5570e1b..b228b0d13 100644 --- a/iceoryx2-cal/tests/event_tests.rs +++ b/iceoryx2-cal/tests/event_tests.rs @@ -19,6 +19,7 @@ mod event { use std::time::Instant; use iceoryx2_bb_container::semantic_string::*; + use iceoryx2_bb_log::{set_log_level, LogLevel}; use iceoryx2_bb_posix::barrier::*; use iceoryx2_bb_system_types::file_name::FileName; use iceoryx2_bb_testing::watchdog::Watchdog; @@ -712,6 +713,7 @@ mod event { #[test] fn out_of_scope_listener_shall_not_corrupt_notifier() { + set_log_level(LogLevel::Trace); let name = generate_name(); let config = generate_isolated_config::(); @@ -727,6 +729,7 @@ mod event { drop(sut_listener); let result = sut_notifier.notify(TriggerId::new(0)); + // either present a disconnect error when available or continue sending without counterpart, for // instance when the event is network socket based if result.is_err() { diff --git a/iceoryx2/src/port/notifier.rs b/iceoryx2/src/port/notifier.rs index 93a39d19b..2ec421437 100644 --- a/iceoryx2/src/port/notifier.rs +++ b/iceoryx2/src/port/notifier.rs @@ -155,6 +155,9 @@ impl ListenerConnections { Err(iceoryx2_cal::event::NotifierCreateError::InsufficientPermissions) => { warn!(from self, "{} since the permissions do not match. The service or the participants are maybe misconfigured.", msg); } + Err(iceoryx2_cal::event::NotifierCreateError::Interrupt) => { + debug!(from self, "{} since an interrupt signal was received.", msg); + } Err(iceoryx2_cal::event::NotifierCreateError::InternalFailure) => { debug!(from self, "{} due to an internal failure.", msg); } From d1ac9bf5578ac811ef1fa6f2dc396508dae09b9e Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 20 Jan 2025 20:10:01 +0100 Subject: [PATCH 7/9] [#508] Make socket duplication available for windows --- .../src/event/process_local_socketpair.rs | 4 +- iceoryx2-pal/posix/src/windows/unistd.rs | 37 +++++++++++++++++-- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/iceoryx2-cal/src/event/process_local_socketpair.rs b/iceoryx2-cal/src/event/process_local_socketpair.rs index 704169eb6..fca2ca3bd 100644 --- a/iceoryx2-cal/src/event/process_local_socketpair.rs +++ b/iceoryx2-cal/src/event/process_local_socketpair.rs @@ -10,8 +10,7 @@ // // SPDX-License-Identifier: Apache-2.0 OR MIT -use std::{collections::HashMap, time::Duration}; - +use core::time::Duration; pub use iceoryx2_bb_container::semantic_string::SemanticString; use iceoryx2_bb_log::{debug, fail, fatal_panic}; use iceoryx2_bb_posix::{ @@ -25,6 +24,7 @@ use iceoryx2_bb_posix::{ }; pub use iceoryx2_bb_system_types::{file_name::FileName, file_path::FilePath, path::Path}; use once_cell::sync::Lazy; +use std::collections::HashMap; use crate::named_concept::{ NamedConceptConfiguration, NamedConceptDoesExistError, NamedConceptListError, diff --git a/iceoryx2-pal/posix/src/windows/unistd.rs b/iceoryx2-pal/posix/src/windows/unistd.rs index 884e8c03d..66150cf7f 100644 --- a/iceoryx2-pal/posix/src/windows/unistd.rs +++ b/iceoryx2-pal/posix/src/windows/unistd.rs @@ -18,7 +18,10 @@ use windows_sys::Win32::{ Foundation::{ CloseHandle, ERROR_FILE_NOT_FOUND, ERROR_NO_MORE_FILES, FALSE, INVALID_HANDLE_VALUE, TRUE, }, - Networking::WinSock::closesocket, + Networking::WinSock::{ + closesocket, WSADuplicateSocketA, WSASocketA, INVALID_SOCKET, SOCKET_ERROR, + WSAPROTOCOL_INFOA, + }, Storage::FileSystem::{ FlushFileBuffers, GetFileAttributesA, ReadFile, RemoveDirectoryA, SetEndOfFile, SetFilePointerEx, WriteFile, FILE_ATTRIBUTE_DIRECTORY, FILE_ATTRIBUTE_READONLY, FILE_BEGIN, @@ -42,7 +45,10 @@ use crate::{ posix::{constants::*, types::*, win32_handle_translator::FdHandleEntry, Errno}, }; -use super::{settings::MAX_PATH_LENGTH, win32_handle_translator::HandleTranslator}; +use super::{ + settings::MAX_PATH_LENGTH, + win32_handle_translator::{HandleTranslator, SocketHandle}, +}; use crate::win32call; impl Struct for SYSTEM_INFO {} @@ -142,8 +148,33 @@ pub unsafe fn getppid() -> pid_t { parent_process_id } +impl Struct for WSAPROTOCOL_INFOA {} + pub unsafe fn dup(fildes: int) -> int { - -1 + match HandleTranslator::get_instance().get(fildes) { + Some(FdHandleEntry::Socket(handle)) => { + let mut protocol_info = WSAPROTOCOL_INFOA::new(); + let (result, _) = win32call! { winsock WSADuplicateSocketA(handle.fd, GetCurrentProcessId(), &mut protocol_info) }; + if result == SOCKET_ERROR { + return -1; + } + let (duplicated_socket, _) = + win32call! { WSASocketA(AF_UNIX as _, SOCK_STREAM, 0, &protocol_info, 0, 0)}; + if duplicated_socket == INVALID_SOCKET { + return -1; + } + + HandleTranslator::get_instance().add(FdHandleEntry::Socket(SocketHandle { + fd: duplicated_socket, + recv_timeout: None, + send_timeout: None, + })) + } + _ => { + Errno::set(Errno::EBADF); + -1 + } + } } pub unsafe fn close(fd: int) -> int { From bfea9517ec6e2ce6b1b1774d30755f78ad827e6f Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 20 Jan 2025 20:14:12 +0100 Subject: [PATCH 8/9] [#508] Update changelog --- doc/release-notes/iceoryx2-unreleased.md | 2 ++ iceoryx2-cal/tests/event_tests.rs | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/doc/release-notes/iceoryx2-unreleased.md b/doc/release-notes/iceoryx2-unreleased.md index 1e13d947d..a15b2420c 100644 --- a/doc/release-notes/iceoryx2-unreleased.md +++ b/doc/release-notes/iceoryx2-unreleased.md @@ -13,6 +13,8 @@ * Introduce `socket_pair` abstraction in POSIX wrapper [#508](https://github.com/eclipse-iceoryx/iceoryx2/issues/508) +* Introduce `socket_pair` event concept + [#508](https://github.com/eclipse-iceoryx/iceoryx2/issues/508) * Deadline property for event services [#573](https://github.com/eclipse-iceoryx/iceoryx2/issues/573) * Use 'std_instead_of_core' clippy warning diff --git a/iceoryx2-cal/tests/event_tests.rs b/iceoryx2-cal/tests/event_tests.rs index b228b0d13..b8ad4e7d2 100644 --- a/iceoryx2-cal/tests/event_tests.rs +++ b/iceoryx2-cal/tests/event_tests.rs @@ -713,7 +713,6 @@ mod event { #[test] fn out_of_scope_listener_shall_not_corrupt_notifier() { - set_log_level(LogLevel::Trace); let name = generate_name(); let config = generate_isolated_config::(); From 556c61897d54e5b5fa8e5ef1d900fe255e30aecb Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Tue, 21 Jan 2025 11:31:58 +0100 Subject: [PATCH 9/9] [#508] Fix socket timeouts in windows platform --- iceoryx2-cal/tests/event_tests.rs | 1 - iceoryx2-pal/posix/src/windows/fcntl.rs | 8 +++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/iceoryx2-cal/tests/event_tests.rs b/iceoryx2-cal/tests/event_tests.rs index b8ad4e7d2..07c94d5ab 100644 --- a/iceoryx2-cal/tests/event_tests.rs +++ b/iceoryx2-cal/tests/event_tests.rs @@ -19,7 +19,6 @@ mod event { use std::time::Instant; use iceoryx2_bb_container::semantic_string::*; - use iceoryx2_bb_log::{set_log_level, LogLevel}; use iceoryx2_bb_posix::barrier::*; use iceoryx2_bb_system_types::file_name::FileName; use iceoryx2_bb_testing::watchdog::Watchdog; diff --git a/iceoryx2-pal/posix/src/windows/fcntl.rs b/iceoryx2-pal/posix/src/windows/fcntl.rs index c4be6c6c1..d2126b61f 100644 --- a/iceoryx2-pal/posix/src/windows/fcntl.rs +++ b/iceoryx2-pal/posix/src/windows/fcntl.rs @@ -183,7 +183,13 @@ pub unsafe fn fcntl_int(fd: int, cmd: int, arg: int) -> int { } let socket_fd = match HandleTranslator::get_instance().get(fd) { - Some(FdHandleEntry::Socket(socket)) => socket.fd, + Some(FdHandleEntry::Socket(mut socket)) => { + if cmd == F_SETFL && (arg & O_NONBLOCK != 0) { + socket.recv_timeout = None; + HandleTranslator::get_instance().update(FdHandleEntry::Socket(socket)); + } + socket.fd + } Some(FdHandleEntry::UdsDatagramSocket(mut socket)) => { if cmd == F_SETFL && (arg & O_NONBLOCK != 0) { socket.recv_timeout = None;