Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to async state service request #461

Merged
merged 7 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions include/ignition/gazebo/gui/GuiRunner.hh
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ class IGNITION_GAZEBO_VISIBLE GuiRunner : public QObject
/// \brief Make a new state request to the server.
public slots: void RequestState();

/// \brief Callback for the state service.
/// \brief Callback for the async state service.
/// \param[in] _res Response containing new state.
/// \param[in] _result True if successful.
private: void OnStateService(const msgs::SerializedStepMap &_res,
const bool _result);
private: void OnStateAsyncService(const msgs::SerializedStepMap &_res);

/// \brief Callback when a new state is received from the server.
/// \param[in] _msg New state message.
Expand Down
27 changes: 18 additions & 9 deletions src/gui/GuiRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@ GuiRunner::~GuiRunner() = default;
/////////////////////////////////////////////////
void GuiRunner::RequestState()
{
this->node.Request(this->stateTopic, &GuiRunner::OnStateService, this);
// set up service for async state response callback
std::string id = std::to_string(gui::App()->applicationPid());
std::string reqSrv =
this->node.Options().NameSpace() + "/" + id + "/state_async";
this->node.Advertise(reqSrv, &GuiRunner::OnStateAsyncService, this);
ignition::msgs::StringMsg req;
req.set_data(reqSrv);

// send async state request
this->node.Request(this->stateTopic + "_async", req);
}

/////////////////////////////////////////////////
Expand All @@ -77,17 +86,17 @@ void GuiRunner::OnPluginAdded(const QString &_objectName)
}

/////////////////////////////////////////////////
void GuiRunner::OnStateService(const msgs::SerializedStepMap &_res,
const bool _result)
void GuiRunner::OnStateAsyncService(const msgs::SerializedStepMap &_res)
{
if (!_result)
{
ignerr << "Service call failed for [" << this->stateTopic << "]"
<< std::endl;
return;
}
this->OnState(_res);

// todo(anyone) store reqSrv string in a member variable and use it here
// and in RequestState()
std::string id = std::to_string(gui::App()->applicationPid());
std::string reqSrv =
this->node.Options().NameSpace() + "/" + id + "/state_async";
this->node.UnadvertiseSrv(reqSrv);

// Only subscribe to periodic updates after receiving initial state
if (this->node.SubscribedTopics().empty())
this->node.Subscribe(this->stateTopic, &GuiRunner::OnState, this);
Expand Down
1 change: 0 additions & 1 deletion src/systems/physics/Physics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,6 @@ void PhysicsPrivate::CreatePhysicsEntities(const EntityComponentManager &_ecm)
<< std::endl;
return true;
}

// TODO(anyone) Don't load models unless they have collisions

// Check if parent world / model exists
Expand Down
40 changes: 40 additions & 0 deletions src/systems/scene_broadcaster/SceneBroadcaster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <chrono>
#include <condition_variable>
#include <string>
#include <unordered_set>

#include <ignition/common/Profiler.hh>
#include <ignition/math/graph/Graph.hh>
Expand Down Expand Up @@ -75,6 +76,10 @@ class ignition::gazebo::systems::SceneBroadcasterPrivate
/// \return True if successful.
public: bool StateService(ignition::msgs::SerializedStepMap &_res);

/// \brief Callback for state service - non blocking.
/// \param[out] _res Response containing the last available full state.
public: void StateAsyncService(const ignition::msgs::StringMsg &_req);

/// \brief Updates the scene graph when entities are added
/// \param[in] _manager The entity component manager
public: void SceneGraphAddEntities(const EntityComponentManager &_manager);
Expand Down Expand Up @@ -190,6 +195,9 @@ class ignition::gazebo::systems::SceneBroadcasterPrivate

/// \brief Flag used to indicate if the state service was called.
public: bool stateServiceRequest{false};

/// \brief A list of async state requests
public: std::unordered_set<std::string> stateRequests;
};

//////////////////////////////////////////////////
Expand Down Expand Up @@ -270,6 +278,7 @@ void SceneBroadcaster::PostUpdate(const UpdateInfo &_info,

if (this->dataPtr->stateServiceRequest || shouldPublish)
{
std::unique_lock<std::mutex> lock(this->dataPtr->stateMutex);
this->dataPtr->stepMsg.Clear();

set(this->dataPtr->stepMsg.mutable_stats(), _info);
Expand All @@ -294,6 +303,16 @@ void SceneBroadcaster::PostUpdate(const UpdateInfo &_info,
this->dataPtr->stateCv.notify_all();
}

// process async state requests
if (!this->dataPtr->stateRequests.empty())
{
for (const auto &reqSrv : this->dataPtr->stateRequests)
{
this->dataPtr->node->Request(reqSrv, this->dataPtr->stepMsg);
}
this->dataPtr->stateRequests.clear();
}

// Poses periodically + change events
// TODO(louise) Send changed state periodically instead, once it reflects
// changed components
Expand Down Expand Up @@ -448,6 +467,8 @@ void SceneBroadcasterPrivate::SetupTransport(const std::string &_worldName)
<< graphService << "]" << std::endl;

// State service
// Note: GuiRunner used to call this service but it is now using the async
// version (state_async)
std::string stateService{"state"};

this->node->Advertise(stateService, &SceneBroadcasterPrivate::StateService,
Expand All @@ -456,6 +477,15 @@ void SceneBroadcasterPrivate::SetupTransport(const std::string &_worldName)
ignmsg << "Serving full state on [" << opts.NameSpace() << "/"
<< stateService << "]" << std::endl;

// Async State service
std::string stateAsyncService{"state_async"};

this->node->Advertise(stateAsyncService,
&SceneBroadcasterPrivate::StateAsyncService, this);

ignmsg << "Serving full state (async) on [" << opts.NameSpace() << "/"
<< stateAsyncService << "]" << std::endl;

// Scene info topic
std::string sceneTopic{"/world/" + _worldName + "/scene/info"};

Expand Down Expand Up @@ -523,6 +553,15 @@ bool SceneBroadcasterPrivate::SceneInfoService(ignition::msgs::Scene &_res)
return true;
}

//////////////////////////////////////////////////
void SceneBroadcasterPrivate::StateAsyncService(
const ignition::msgs::StringMsg &_req)
{
std::unique_lock<std::mutex> lock(this->stateMutex);
this->stateServiceRequest = true;
this->stateRequests.insert(_req.data());
}

//////////////////////////////////////////////////
bool SceneBroadcasterPrivate::StateService(
ignition::msgs::SerializedStepMap &_res)
Expand All @@ -531,6 +570,7 @@ bool SceneBroadcasterPrivate::StateService(

// Lock and wait for an iteration to be run and fill the state
std::unique_lock<std::mutex> lock(this->stateMutex);

this->stateServiceRequest = true;
auto success = this->stateCv.wait_for(lock, 5s, [&]
{
Expand Down
25 changes: 25 additions & 0 deletions test/integration/scene_broadcaster_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ TEST_P(SceneBroadcasterTest, State)
checkMsg(_res, 3);
};

// async state request with full state response
std::function<void(const msgs::SerializedStepMap &)> cbAsync =
[&](const msgs::SerializedStepMap &_res)
{
checkMsg(_res, 16);
};

// The request is blocking even though it's meant to be async, so we spin a
// thread
auto request = [&]()
Expand Down Expand Up @@ -429,6 +436,24 @@ TEST_P(SceneBroadcasterTest, State)
while (!received && sleep++ < maxSleep)
IGN_SLEEP_MS(100);
EXPECT_TRUE(received);

// test async state request
received = false;
std::string reqSrv = "/state_async_callback_test";
node.Advertise(reqSrv, cbAsync);

ignition::msgs::StringMsg req;
req.set_data(reqSrv);
node.Request("/world/default/state_async", req);

sleep = 0;
while (!received && sleep++ < maxSleep)
{
// Run server
server.Run(true, 1, false);
IGN_SLEEP_MS(100);
}
EXPECT_TRUE(received);
}

/////////////////////////////////////////////////
Expand Down