Skip to content

Commit

Permalink
Merge pull request #572 from elfenpiff/iox2-571-fix-history-bug
Browse files Browse the repository at this point in the history
[#571] Fix completion queue capacity exceeded when history > buffer
  • Loading branch information
elfenpiff authored Jan 3, 2025
2 parents f4931fd + f5fa861 commit 7dbabc1
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 11 deletions.
2 changes: 1 addition & 1 deletion config/iceoryx2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ creation-timeout.nanos = 500000000
max-subscribers = 8
max-publishers = 2
max-nodes = 20
publisher-history-size = 1
publisher-history-size = 0
subscriber-max-buffer-size = 2
subscriber-max-borrowed-samples = 2
publisher-max-loaned-samples = 2
Expand Down
3 changes: 2 additions & 1 deletion doc/release-notes/iceoryx2-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
conflicts when merging.
-->

* Example text [#1](https://github.com/eclipse-iceoryx/iceoryx2/issues/1)
* Completion queue capacity exceeded when history > buffer
[#571](https://github.com/eclipse-iceoryx/iceoryx2/issues/571)

### Refactoring

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ TYPED_TEST(ServicePublishSubscribeTest, update_connections_delivers_history) {
const auto service_name = iox2_testing::generate_service_name();

auto node = NodeBuilder().create<SERVICE_TYPE>().expect("");
auto service = node.service_builder(service_name).template publish_subscribe<uint64_t>().create().expect("");
auto service =
node.service_builder(service_name).template publish_subscribe<uint64_t>().history_size(1).create().expect("");

auto sut_publisher = service.publisher_builder().create().expect("");
const uint64_t payload = 123;
Expand Down
2 changes: 1 addition & 1 deletion iceoryx2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl Default for Config {
max_subscribers: 8,
max_publishers: 2,
max_nodes: 20,
publisher_history_size: 1,
publisher_history_size: 0,
subscriber_max_buffer_size: 2,
subscriber_max_borrowed_samples: 2,
publisher_max_loaned_samples: 2,
Expand Down
16 changes: 12 additions & 4 deletions iceoryx2/src/port/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ use iceoryx2_cal::named_concept::{NamedConceptListError, NamedConceptRemoveError
use iceoryx2_cal::shared_memory::ShmPointer;
use iceoryx2_cal::shm_allocator::{AllocationStrategy, PointerOffset, ShmAllocationError};
use iceoryx2_cal::zero_copy_connection::{
ZeroCopyConnection, ZeroCopyCreationError, ZeroCopySendError, ZeroCopySender,
ZeroCopyConnection, ZeroCopyCreationError, ZeroCopyPortDetails, ZeroCopySendError,
ZeroCopySender,
};
use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicBool, IoxAtomicU64, IoxAtomicUsize};
use std::any::TypeId;
Expand Down Expand Up @@ -403,7 +404,6 @@ impl<Service: service::Service> PublisherBackend<Service> {
sample_size: usize,
) -> Result<usize, PublisherSendError> {
self.retrieve_returned_samples();

let deliver_call = match self.config.unable_to_deliver_strategy {
UnableToDeliverStrategy::Block => {
<Service::Connection as ZeroCopyConnection>::Sender::blocking_send
Expand Down Expand Up @@ -553,13 +553,21 @@ impl<Service: service::Service> PublisherBackend<Service> {
None => (),
Some(history) => {
let history = unsafe { &mut *history.get() };
for i in 0..history.len() {
let buffer_size = connection.sender.buffer_size();
let history_start = history.len().saturating_sub(buffer_size);

for i in history_start..history.len() {
let old_sample = unsafe { history.get_unchecked(i) };
self.retrieve_returned_samples();

let offset = PointerOffset::from_value(old_sample.offset);
match connection.sender.try_send(offset, old_sample.size) {
Ok(_) => {
Ok(overflow) => {
self.borrow_sample(offset);

if let Some(old) = overflow {
self.release_sample(old);
}
}
Err(e) => {
warn!(from self, "Failed to deliver history to new subscriber via {:?} due to {:?}", connection, e);
Expand Down
1 change: 1 addition & 0 deletions iceoryx2/src/port/update_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub trait UpdateConnections {
/// #
/// # let service = node.service_builder(&"My/Funk/ServiceName".try_into()?)
/// # .publish_subscribe::<u64>()
/// # .history_size(1)
/// # .open_or_create()?;
/// #
/// # let publisher = service.publisher_builder().create()?;
Expand Down
4 changes: 2 additions & 2 deletions iceoryx2/src/service/static_config/messaging_pattern.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ mod tests {
let cfg = config::Config::default();
let p1 = MessagingPattern::PublishSubscribe(publish_subscribe::StaticConfig::new(&cfg));
let sut = p1.required_amount_of_samples_per_data_segment(0);
assert_that!(sut, eq 33);
assert_that!(sut, eq 32);
let sut = p1.required_amount_of_samples_per_data_segment(1);
assert_that!(sut, eq 34);
assert_that!(sut, eq 33);

let e1 = MessagingPattern::Event(event::StaticConfig::new(&cfg));
let sut = e1.required_amount_of_samples_per_data_segment(1);
Expand Down
3 changes: 2 additions & 1 deletion iceoryx2/tests/sample_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ mod sample {

#[test]
fn sample_from_dropped_subscriber_does_not_block_new_subscribers<Sut: Service>() {
let config = generate_isolated_config();
let mut config = generate_isolated_config();
config.defaults.publish_subscribe.publisher_history_size = 1;
let test_context = TestContext::<Sut>::new(&config);
const PAYLOAD_1: u64 = 7781123554;

Expand Down
82 changes: 82 additions & 0 deletions iceoryx2/tests/service_publish_subscribe_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3200,6 +3200,88 @@ mod service_publish_subscribe {
deliver_history_with_increasing_samples_works::<Sut>(AllocationStrategy::PowerOfTwo);
}

#[test]
fn does_not_leak_when_subscriber_has_smaller_buffer_size_than_history_size<Sut: Service>() {
let _watchdog = Watchdog::new();
const HISTORY_SIZE: usize = 1000;
const REPETITIONS: usize = 10;
let service_name = generate_name();
let config = generate_isolated_config();
let node = NodeBuilder::new().config(&config).create::<Sut>().unwrap();

let finish_setup = Barrier::new(2);
let start = Barrier::new(2);
let end = Barrier::new(2);

std::thread::scope(|s| {
let update_connection_thread = s.spawn(|| {
let service = node
.service_builder(&service_name)
.publish_subscribe::<usize>()
.max_publishers(1)
.max_subscribers(1)
.subscriber_max_borrowed_samples(1)
.history_size(HISTORY_SIZE)
.subscriber_max_buffer_size(HISTORY_SIZE)
.create()
.unwrap();

let publisher = service
.publisher_builder()
.max_loaned_samples(1)
.create()
.unwrap();

for n in 0..HISTORY_SIZE {
publisher.send_copy(n).unwrap();
}

finish_setup.wait();

for _ in 0..REPETITIONS {
start.wait();

publisher.update_connections().unwrap();

end.wait();
}
});

let new_subscriber_thread = s.spawn(|| {
finish_setup.wait();

let service = node
.service_builder(&service_name)
.publish_subscribe::<usize>()
.open()
.unwrap();

for _ in 0..REPETITIONS {
let subscriber = service
.subscriber_builder()
.buffer_size(1)
.create()
.unwrap();
start.wait();

let mut previous_value = 0;
for _ in 0..HISTORY_SIZE {
let sample = subscriber.receive().unwrap();
if let Some(sample) = sample {
assert_that!(*sample, ge previous_value);
previous_value = *sample;
}
}

end.wait();
}
});

update_connection_thread.join().unwrap();
new_subscriber_thread.join().unwrap();
});
}

#[instantiate_tests(<iceoryx2::service::ipc::Service>)]
mod ipc {}

Expand Down

0 comments on commit 7dbabc1

Please sign in to comment.