Skip to content

Commit

Permalink
Merge pull request #597 from elfenpiff/iox2-596-zero-copy-connection-…
Browse files Browse the repository at this point in the history
…cleanup

[#596] zero copy connection cleanup
  • Loading branch information
elfenpiff authored Jan 20, 2025
2 parents 506dcff + 87cb99c commit dd42fc9
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 27 deletions.
3 changes: 3 additions & 0 deletions doc/release-notes/iceoryx2-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
* Increase max supported shared memory size in Windows that restricts
the maximum supported payload size
[#575](https://github.com/eclipse-iceoryx/iceoryx2/issues/575)
* Undefined behavior due to ZeroCopyConnection removal when stale resources
are cleaned up
[#596](https://github.com/eclipse-iceoryx/iceoryx2/issues/596)

### Refactoring

Expand Down
64 changes: 63 additions & 1 deletion iceoryx2-cal/src/zero_copy_connection/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub mod details {
index_queue::RelocatableIndexQueue,
safely_overflowing_index_queue::RelocatableSafelyOverflowingIndexQueue,
};
use iceoryx2_bb_log::{fail, fatal_panic};
use iceoryx2_bb_log::{fail, fatal_panic, warn};
use iceoryx2_bb_posix::adaptive_wait::AdaptiveWaitBuilder;

use self::used_chunk_list::RelocatableUsedChunkList;
Expand Down Expand Up @@ -123,6 +123,12 @@ pub mod details {
state_to_remove: State,
) {
let mut current_state = storage.get().state.load(Ordering::Relaxed);
if current_state == State::MarkedForDestruction.value() {
warn!(from "common::ZeroCopyConnection::cleanup_shared_memory()",
"Trying to remove state {:?} on the connection {:?} which is already marked for destruction.", state_to_remove, storage.name());
return;
}

loop {
let new_state = if current_state == state_to_remove.value() {
State::MarkedForDestruction.value()
Expand Down Expand Up @@ -783,12 +789,68 @@ pub mod details {
Ok(())
}
}
impl<Storage: DynamicStorage<SharedManagementData>> Connection<Storage> {
fn open_storage(
name: &FileName,
config: &<Connection<Storage> as NamedConceptMgmt>::Configuration,
msg: &str,
) -> Result<Storage, ZeroCopyPortRemoveError> {
let origin = "Connection::open_storage()";
match <<Storage as DynamicStorage<SharedManagementData>>::Builder<'_> as NamedConceptBuilder<
Storage>>::new(name)
.config(&config.dynamic_storage_config).open() {
Ok(storage) => Ok(storage),
Err(DynamicStorageOpenError::VersionMismatch) => {
fail!(from origin, with ZeroCopyPortRemoveError::VersionMismatch,
"{msg} since the underlying dynamic storage has a different iceoryx2 version.");
}
Err(DynamicStorageOpenError::InitializationNotYetFinalized) => {
fail!(from origin, with ZeroCopyPortRemoveError::InsufficientPermissions,
"{msg} due to insufficient permissions.");
}
Err(DynamicStorageOpenError::DoesNotExist) => {
fail!(from origin, with ZeroCopyPortRemoveError::DoesNotExist,
"{msg} since the underlying dynamic storage does not exist.");
}
Err(DynamicStorageOpenError::InternalError) => {
fail!(from origin, with ZeroCopyPortRemoveError::InternalError,
"{msg} due to an internal error.");
}
}
}
}

impl<Storage: DynamicStorage<SharedManagementData>> ZeroCopyConnection for Connection<Storage> {
type Sender = Sender<Storage>;
type Builder = Builder<Storage>;
type Receiver = Receiver<Storage>;

unsafe fn remove_sender(
name: &FileName,
config: &Self::Configuration,
) -> Result<(), ZeroCopyPortRemoveError> {
let storage = Self::open_storage(
name,
config,
"Unable to remove forcefully the sender of the Zero Copy Connection",
)?;
cleanup_shared_memory(&storage, State::Sender);
Ok(())
}

unsafe fn remove_receiver(
name: &FileName,
config: &Self::Configuration,
) -> Result<(), ZeroCopyPortRemoveError> {
let storage = Self::open_storage(
name,
config,
"Unable to remove forcefully the receiver of the Zero Copy Connection",
)?;
cleanup_shared_memory(&storage, State::Receiver);
Ok(())
}

fn does_support_safe_overflow() -> bool {
true
}
Expand Down
32 changes: 32 additions & 0 deletions iceoryx2-cal/src/zero_copy_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ use crate::static_storage::file::{NamedConcept, NamedConceptBuilder, NamedConcep
pub use iceoryx2_bb_system_types::file_name::*;
pub use iceoryx2_bb_system_types::path::Path;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ZeroCopyPortRemoveError {
InternalError,
VersionMismatch,
InsufficientPermissions,
DoesNotExist,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ZeroCopyCreationError {
InternalError,
Expand Down Expand Up @@ -166,6 +174,30 @@ pub trait ZeroCopyConnection: Debug + Sized + NamedConceptMgmt {
type Receiver: ZeroCopyReceiver;
type Builder: ZeroCopyConnectionBuilder<Self>;

/// Removes the [`ZeroCopySender`] forcefully from the [`ZeroCopyConnection`]. This shall only
/// be called when the [`ZeroCopySender`] died and the connection shall be cleaned up without
/// causing any problems on the living [`ZeroCopyReceiver`] side.
///
/// # Safety
///
/// * must ensure that the [`ZeroCopySender`] died while being connected.
unsafe fn remove_sender(
name: &FileName,
config: &Self::Configuration,
) -> Result<(), ZeroCopyPortRemoveError>;

/// Removes the [`ZeroCopyReceiver`] forcefully from the [`ZeroCopyConnection`]. This shall
/// only be called when the [`ZeroCopySender`] died and the connection shall be cleaned up
/// without causing any problems on the living [`ZeroCopySender`] side.
///
/// # Safety
///
/// * must ensure that the [`ZeroCopyReceiver`] died while being connected.
unsafe fn remove_receiver(
name: &FileName,
config: &Self::Configuration,
) -> Result<(), ZeroCopyPortRemoveError>;

/// Returns true if the connection supports safe overflow
fn does_support_safe_overflow() -> bool {
false
Expand Down
65 changes: 65 additions & 0 deletions iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,71 @@ mod zero_copy_connection {
assert_that!(returned_sample, eq Some(overflow_sample));
}

#[test]
fn explicitly_releasing_first_sender_then_receiver_removes_connection<
Sut: ZeroCopyConnection,
>() {
let name = generate_name();
let config = generate_isolated_config::<Sut>();

let sut_receiver = Sut::Builder::new(&name)
.number_of_samples_per_segment(NUMBER_OF_SAMPLES)
.config(&config)
.create_receiver()
.unwrap();
let sut_sender = Sut::Builder::new(&name)
.number_of_samples_per_segment(NUMBER_OF_SAMPLES)
.config(&config)
.create_sender()
.unwrap();

core::mem::forget(sut_receiver);
core::mem::forget(sut_sender);

assert_that!(Sut::does_exist_cfg(&name, &config), eq Ok(true));
assert_that!(unsafe { Sut::remove_sender(&name, &config) }, is_ok);
assert_that!(Sut::does_exist_cfg(&name, &config), eq Ok(true));
assert_that!(unsafe { Sut::remove_receiver(&name, &config) }, is_ok);
assert_that!(Sut::does_exist_cfg(&name, &config), eq Ok(false));
}

#[test]
fn explicitly_releasing_first_receiver_then_sender_removes_connection<
Sut: ZeroCopyConnection,
>() {
let name = generate_name();
let config = generate_isolated_config::<Sut>();

let sut_receiver = Sut::Builder::new(&name)
.number_of_samples_per_segment(NUMBER_OF_SAMPLES)
.config(&config)
.create_receiver()
.unwrap();
let sut_sender = Sut::Builder::new(&name)
.number_of_samples_per_segment(NUMBER_OF_SAMPLES)
.config(&config)
.create_sender()
.unwrap();

core::mem::forget(sut_receiver);
core::mem::forget(sut_sender);

assert_that!(Sut::does_exist_cfg(&name, &config), eq Ok(true));
assert_that!(unsafe { Sut::remove_receiver(&name, &config) }, is_ok);
assert_that!(Sut::does_exist_cfg(&name, &config), eq Ok(true));
assert_that!(unsafe { Sut::remove_sender(&name, &config) }, is_ok);
assert_that!(Sut::does_exist_cfg(&name, &config), eq Ok(false));
}

#[test]
fn removing_port_from_non_existing_connection_leads_to_error<Sut: ZeroCopyConnection>() {
let name = generate_name();
let config = generate_isolated_config::<Sut>();

assert_that!(unsafe { Sut::remove_receiver(&name, &config) }, eq Err(ZeroCopyPortRemoveError::DoesNotExist));
assert_that!(unsafe { Sut::remove_sender(&name, &config) }, eq Err(ZeroCopyPortRemoveError::DoesNotExist));
}

#[instantiate_tests(<zero_copy_connection::posix_shared_memory::Connection>)]
mod posix_shared_memory {}

Expand Down
73 changes: 47 additions & 26 deletions iceoryx2/src/port/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ use iceoryx2_cal::named_concept::{NamedConceptListError, NamedConceptRemoveError
use iceoryx2_cal::shared_memory::ShmPointer;
use iceoryx2_cal::shm_allocator::{AllocationStrategy, PointerOffset, ShmAllocationError};
use iceoryx2_cal::zero_copy_connection::{
ZeroCopyConnection, ZeroCopyCreationError, ZeroCopyPortDetails, ZeroCopySendError,
ZeroCopySender,
ZeroCopyConnection, ZeroCopyCreationError, ZeroCopyPortDetails, ZeroCopyPortRemoveError,
ZeroCopySendError, ZeroCopySender,
};
use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicBool, IoxAtomicU64, IoxAtomicUsize};

Expand Down Expand Up @@ -232,7 +232,9 @@ impl std::error::Error for PublisherSendError {}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub(crate) enum RemovePubSubPortFromAllConnectionsError {
CleanupRaceDetected,
InsufficientPermissions,
VersionMismatch,
InternalError,
}

Expand Down Expand Up @@ -1158,6 +1160,33 @@ fn connections<Service: service::Service>(
}
}

fn handle_port_remove_error(
result: Result<(), ZeroCopyPortRemoveError>,
origin: &str,
msg: &str,
connection: &FileName,
) -> Result<(), RemovePubSubPortFromAllConnectionsError> {
match result {
Ok(()) => Ok(()),
Err(ZeroCopyPortRemoveError::DoesNotExist) => {
debug!(from origin, "{} since the connection ({:?}) no longer exists! This could indicate a race in the node cleanup algorithm or that the underlying resources were removed manually.", msg, connection);
Err(RemovePubSubPortFromAllConnectionsError::CleanupRaceDetected)
}
Err(ZeroCopyPortRemoveError::InsufficientPermissions) => {
debug!(from origin, "{} due to insufficient permissions to remove the connection ({:?}).", msg, connection);
Err(RemovePubSubPortFromAllConnectionsError::InsufficientPermissions)
}
Err(ZeroCopyPortRemoveError::VersionMismatch) => {
debug!(from origin, "{} since connection ({:?}) has a different iceoryx2 version.", msg, connection);
Err(RemovePubSubPortFromAllConnectionsError::VersionMismatch)
}
Err(ZeroCopyPortRemoveError::InternalError) => {
debug!(from origin, "{} due to insufficient permissions to remove the connection ({:?}).", msg, connection);
Err(RemovePubSubPortFromAllConnectionsError::InternalError)
}
}
}

pub(crate) unsafe fn remove_publisher_from_all_connections<Service: service::Service>(
port_id: &UniquePublisherId,
config: &config::Config,
Expand All @@ -1176,19 +1205,15 @@ pub(crate) unsafe fn remove_publisher_from_all_connections<Service: service::Ser
for connection in connection_list {
let publisher_id = extract_publisher_id_from_connection(&connection);
if publisher_id == *port_id {
match <Service::Connection as NamedConceptMgmt>::remove_cfg(
let result = handle_port_remove_error(
Service::Connection::remove_sender(&connection, &connection_config),
&origin,
msg,
&connection,
&connection_config,
) {
Ok(_) => (),
Err(NamedConceptRemoveError::InsufficientPermissions) => {
debug!(from origin, "{} due to insufficient permissions to remove the connection ({:?}).", msg, connection);
ret_val = Err(RemovePubSubPortFromAllConnectionsError::InsufficientPermissions);
}
Err(NamedConceptRemoveError::InternalError) => {
debug!(from origin, "{} due to insufficient permissions to remove the connection ({:?}).", msg, connection);
ret_val = Err(RemovePubSubPortFromAllConnectionsError::InternalError);
}
);

if ret_val.is_ok() {
ret_val = result;
}
}
}
Expand All @@ -1214,19 +1239,15 @@ pub(crate) unsafe fn remove_subscriber_from_all_connections<Service: service::Se
for connection in connection_list {
let subscriber_id = extract_subscriber_id_from_connection(&connection);
if subscriber_id == *port_id {
match <Service::Connection as NamedConceptMgmt>::remove_cfg(
let result = handle_port_remove_error(
Service::Connection::remove_receiver(&connection, &connection_config),
&origin,
msg,
&connection,
&connection_config,
) {
Ok(_) => (),
Err(NamedConceptRemoveError::InsufficientPermissions) => {
debug!(from origin, "{} due to insufficient permissions to remove the connection ({:?}).", msg, connection);
ret_val = Err(RemovePubSubPortFromAllConnectionsError::InsufficientPermissions);
}
Err(NamedConceptRemoveError::InternalError) => {
debug!(from origin, "{} due to insufficient permissions to remove the connection ({:?}).", msg, connection);
ret_val = Err(RemovePubSubPortFromAllConnectionsError::InternalError);
}
);

if ret_val.is_ok() {
ret_val = result;
}
}
}
Expand Down

0 comments on commit dd42fc9

Please sign in to comment.