From 970d9867f3883f6175dbeef0776112b7d01c4f59 Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Fri, 19 Jan 2018 16:14:40 -0800 Subject: [PATCH] take raw --- rmw_fastrtps_cpp/src/rmw_take.cpp | 81 +++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/rmw_fastrtps_cpp/src/rmw_take.cpp b/rmw_fastrtps_cpp/src/rmw_take.cpp index 9231b3129..787ae02d7 100644 --- a/rmw_fastrtps_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_cpp/src/rmw_take.cpp @@ -106,4 +106,85 @@ rmw_take_with_info( return RMW_RET_OK; } + +rmw_ret_t +rmw_take_raw( + const rmw_subscription_t * subscription, + rmw_message_raw_t * raw_message, + bool * taken) +{ + assert(subscription); + assert(raw_message); + assert(taken); + + *taken = false; + + if (subscription->implementation_identifier != eprosima_fastrtps_identifier) { + RMW_SET_ERROR_MSG("publisher handle not from this implementation"); + return RMW_RET_ERROR; + } + + CustomSubscriberInfo * info = static_cast(subscription->data); + assert(info); + + eprosima::fastcdr::FastBuffer buffer; + eprosima::fastrtps::SampleInfo_t sinfo; + + if (info->subscriber_->takeNextData(&buffer, &sinfo)) { + info->listener_->data_taken(); + + if (sinfo.sampleKind == ALIVE) { + raw_message->buffer_length = buffer.getBufferSize(); + raw_message->buffer = (char *)malloc(sizeof(char) * raw_message->buffer_length); + memcpy(raw_message->buffer, buffer.getBuffer(), raw_message->buffer_length); + *taken = true; + } + } + + return RMW_RET_OK; +} + +rmw_ret_t +rmw_take_raw_with_info( + const rmw_subscription_t * subscription, + rmw_message_raw_t * raw_message, + bool * taken, + rmw_message_info_t * message_info) +{ + assert(subscription); + assert(raw_message); + assert(taken); + + *taken = false; + + if (subscription->implementation_identifier != eprosima_fastrtps_identifier) { + RMW_SET_ERROR_MSG("publisher handle not from this implementation"); + return RMW_RET_ERROR; + } + + CustomSubscriberInfo * info = static_cast(subscription->data); + assert(info); + + eprosima::fastcdr::FastBuffer buffer; + eprosima::fastrtps::SampleInfo_t sinfo; + + if (info->subscriber_->takeNextData(&buffer, &sinfo)) { + info->listener_->data_taken(); + + if (sinfo.sampleKind == ALIVE) { + raw_message->buffer_length = buffer.getBufferSize(); + raw_message->buffer = (char *)malloc(sizeof(char) * raw_message->buffer_length); + memcpy(raw_message->buffer, buffer.getBuffer(), raw_message->buffer_length); + rmw_gid_t * sender_gid = &message_info->publisher_gid; + sender_gid->implementation_identifier = eprosima_fastrtps_identifier; + memset(sender_gid->data, 0, RMW_GID_STORAGE_SIZE); + memcpy(sender_gid->data, &sinfo.sample_identity.writer_guid(), + sizeof(eprosima::fastrtps::rtps::GUID_t)); + *taken = true; + } + } + + return RMW_RET_OK; +} + } // extern "C"