Skip to content

Commit

Permalink
Do not send 'request_header' over user payload (ros2#76)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Hoinkis <[email protected]>
  • Loading branch information
mossmaurice committed Jan 5, 2023
1 parent 1911d21 commit 917da67
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ void deserialize(
const rosidl_message_type_support_t * type_supports,
void * ros_message);

/// @todo use a variant here for the 2nd parameter to avoid code duplication
void deserialize(
void deserializeRequest(
const char * serialized_msg,
const rosidl_service_type_support_t * type_supports,
void * ros_message);

void deserializeResponse(
const char * serialized_msg,
const rosidl_service_type_support_t * type_supports,
void * ros_message);
Expand Down
20 changes: 19 additions & 1 deletion rmw_iceoryx_cpp/src/internal/iceoryx_deserialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,25 @@ void deserialize(
}
}

void deserialize(
void deserializeRequest(
const char * serialized_msg,
const rosidl_service_type_support_t * type_supports,
void * ros_message)
{
auto ts = get_type_support(type_supports);

if (ts.first == TypeSupportLanguage::CPP) {
auto members_cpp =
static_cast<const rosidl_typesupport_introspection_cpp::ServiceMembers *>(ts.second->data);
rmw_iceoryx_cpp::details_cpp::deserializeRequest(serialized_msg, members_cpp, ros_message);
} else if (ts.first == TypeSupportLanguage::C) {
auto members_c =
static_cast<const rosidl_typesupport_introspection_c__ServiceMembers *>(ts.second->data);
rmw_iceoryx_cpp::details_c::deserializeRequest(serialized_msg, members_c, ros_message);
}
}

void deserializeResponse(
const char * serialized_msg,
const rosidl_service_type_support_t * type_supports,
void * ros_message)
Expand Down
13 changes: 9 additions & 4 deletions rmw_iceoryx_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,16 @@ rmw_take_request(

iceoryx_server->take()
.and_then(
[&](const void * requestPayload) {
user_payload = requestPayload;
[&](const void * iceoryx_request_payload) {
user_payload = iceoryx_request_payload;
chunk_header = iox::mepoo::ChunkHeader::fromUserPayload(user_payload);
iceoryx_request_header = iox::popo::RequestHeader::fromPayload(user_payload);
ret = RMW_RET_OK;
})
.or_else(
[&](iox::popo::ServerRequestResult& error) {
std::cout << "Could not take request! Error: " << error << std::endl;
*taken = false;
RMW_SET_ERROR_MSG("rmw_take_request error!");
ret = RMW_RET_ERROR;
});
Expand All @@ -157,12 +158,16 @@ rmw_take_request(
if (iceoryx_service_abstraction->is_fixed_size_) {
memcpy(ros_request, user_payload, chunk_header->userPayloadSize());
} else {
rmw_iceoryx_cpp::deserialize(
rmw_iceoryx_cpp::deserializeRequest(
static_cast<const char *>(user_payload),
&iceoryx_service_abstraction->type_supports_,
ros_request);
}
iceoryx_server->releaseRequest(user_payload);
request_header->source_timestamp = 0; // Unsupported until needed
rcutils_system_time_now(&request_header->received_timestamp);
request_header->request_id.sequence_number = iceoryx_request_header->getSequenceId();
request_header->request_id.writer_guid[0] = 42; /// @todo

*taken = true;
ret = RMW_RET_OK;
std::cout << "Server took request!" << std::endl;
Expand Down
42 changes: 23 additions & 19 deletions rmw_iceoryx_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,29 @@ rmw_take_response(

iceoryx_client->take()
.and_then(
[&](const void * responsePayload) {
auto responseHeader = iox::popo::ResponseHeader::fromPayload(responsePayload);
/// @todo check writer guid
if (responseHeader->getSequenceId() == request_header->request_id.sequence_number)
[&](const void * iceoryx_response_payload) {
auto iceoryx_response_header = iox::popo::ResponseHeader::fromPayload(iceoryx_response_payload);
/// @todo check writer guid?
request_header->request_id.sequence_number = iceoryx_response_header->getSequenceId();
request_header->source_timestamp = 0; // Unsupported until needed
rcutils_system_time_now(&request_header->received_timestamp);

if (iceoryx_response_header->getSequenceId() == iceoryx_client_abstraction->sequence_id_ - 1)
{
user_payload = responseHeader;
user_payload = iceoryx_response_payload;
chunk_header = iox::mepoo::ChunkHeader::fromUserPayload(user_payload);
ret = RMW_RET_OK;
}
else
{
std::cout << "Got Response with outdated sequence number!" << std::endl;
RMW_SET_ERROR_MSG("Got response with outdated sequence number!");
*taken = false;
ret = RMW_RET_ERROR;
}
})
.or_else(
[&](iox::popo::ChunkReceiveResult) {
*taken = false;
RMW_SET_ERROR_MSG("No chunk in iceoryx_client");
ret = RMW_RET_ERROR;
});
Expand All @@ -90,12 +96,10 @@ rmw_take_response(

// if fixed size, we fetch the data via memcpy
if (iceoryx_client_abstraction->is_fixed_size_) {
memcpy(request_header, user_payload, sizeof(*request_header));
/// @todo cast to uint8_t before doing pointer arithmetic?
memcpy(ros_response, user_payload + sizeof(*request_header), chunk_header->userPayloadSize());
memcpy(ros_response, user_payload, chunk_header->userPayloadSize());
} else {
rmw_iceoryx_cpp::deserialize(
static_cast<const char *>(user_payload), /// @todo add fourth param for 'request_header', but how to find out when header ends, separator char?
rmw_iceoryx_cpp::deserializeResponse(
static_cast<const char *>(user_payload),
&iceoryx_client_abstraction->type_supports_,
ros_response);
}
Expand All @@ -104,7 +108,6 @@ rmw_take_response(
ret = RMW_RET_OK;
std::cout << "Client took response!" << std::endl;

*taken = false;
return ret;
}

Expand Down Expand Up @@ -138,21 +141,19 @@ rmw_send_response(
rmw_ret_t ret = RMW_RET_ERROR;

auto requestHeader = iox::popo::RequestHeader::fromPayload(iceoryx_server_abstraction->request_payload_);
requestHeader->setSequenceId(request_header->sequence_number);

iceoryx_server->loan(requestHeader, iceoryx_server_abstraction->response_size_, iceoryx_server_abstraction->response_alignment_)
.and_then([&](void * responsePayload) {
/// @todo memcpy or serialize the response
// write |-request_header-|-ros_response-| to shared memory
if (iceoryx_server_abstraction->is_fixed_size_)
{
memcpy(responsePayload, request_header, sizeof(*request_header));
memcpy(responsePayload + sizeof(*request_header), ros_response, iceoryx_server_abstraction->response_size_);
memcpy(responsePayload, ros_response, iceoryx_server_abstraction->response_size_);
}
else
{
// message is not fixed size, so we have to serialize
std::vector<char> payload_vector{};
rmw_iceoryx_cpp::serializeRequest(request_header, &iceoryx_server_abstraction->type_supports_, payload_vector);
rmw_iceoryx_cpp::serializeRequest(ros_response, &iceoryx_server_abstraction->type_supports_, payload_vector);
rmw_iceoryx_cpp::serializeResponse(ros_response, &iceoryx_server_abstraction->type_supports_, payload_vector);
memcpy(responsePayload, payload_vector.data(), payload_vector.size());
}
std::cout << "Server sent response!" << std::endl;
Expand All @@ -163,11 +164,14 @@ rmw_send_response(
});
})
.or_else(
[&](auto&) {
[&](auto& error) {
std::cout << "Could not allocate Response! Error: " << error << std::endl;
RMW_SET_ERROR_MSG("rmw_send_response loan error!");
ret = RMW_RET_ERROR;
});

iceoryx_server->releaseRequest(iceoryx_server_abstraction->request_payload_);

return ret;
}
} // extern "C"

0 comments on commit 917da67

Please sign in to comment.