Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Add option to enable/disable at runtime #74

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ An example launch file called `sample_application.launch` is included in this pr
- `roslaunch `[`raspicam_node`]`camerav2_410x308_30fps.launch`
- `roslaunch h264_video_encoder sample_application.launch`
- `roslaunch kinesis_video_streamer sample_application.launch`
- Start/stop streaming with `rosservice call /kinesis_video_streamer/command "enable: true/false"`
- Log into your AWS Console to see the availabe Kinesis Video stream.
- For other platforms, replace step 1 with an equivalent command to launch your camera node. Reconfigure the topic names accordingly.

Expand All @@ -142,6 +143,7 @@ The parameters below apply to the node as a whole and are not specific to any on
| Parameter Name | Description | Type |
| -------------- | -----------------------------------------------------------| ------------- |
| aws_client_configuration/region | The AWS region which the video should be streamed to. | *string* |
| kinesis_video/enabled | Enable/disable on start-up. | *bool* |
| kinesis_video/stream_count | The number of streams you wish to load and transmit. Each stream should have its corresponding parameter set as described below. | *int* |
| kinesis_video/log4cplus_config | (optional) Config file path for the log4cplus logger, which is used by the Kinesis Video Producer SDK. | *string* |

Expand Down
18 changes: 17 additions & 1 deletion kinesis_video_streamer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,21 @@ find_package(catkin REQUIRED COMPONENTS
kinesis_video_msgs
roscpp
image_transport
message_generation
std_msgs)
catkin_package()


add_service_files(
DIRECTORY srv
FILES
Command.srv
)
generate_messages()

catkin_package(
CATKIN_DEPENDS
message_runtime
)

###########
## Build ##
Expand All @@ -37,6 +50,9 @@ target_include_directories(${PROJECT_NAME}_lib PUBLIC include ${catkin_INCLUDE_D
## Specify libraries to link a library or executable target against
target_link_libraries(${PROJECT_NAME} ${catkin_LIBRARIES} ${kinesis_manager_LIBRARIES} ${PRODUCER_LIBRARY} ${LOG_LIBRARY} ${CURL_LIBRARIES})
target_link_libraries(${PROJECT_NAME}_lib ${catkin_LIBRARIES} ${kinesis_manager_LIBRARIES} ${PRODUCER_LIBRARY} ${CURL_LIBRARIES})
## Specify dependencies
add_dependencies(${PROJECT_NAME} ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
add_dependencies(${PROJECT_NAME}_lib ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})

#############
## Install ##
Expand Down
12 changes: 12 additions & 0 deletions kinesis_video_streamer/include/kinesis_video_streamer/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <ros/ros.h>
#include <aws_ros1_common/sdk_utils/ros1_node_parameter_reader.h>
#include <kinesis_video_streamer/Command.h>

namespace Aws {
namespace Kinesis {
Expand All @@ -40,6 +41,14 @@ class StreamerNode : public ros::NodeHandle

KinesisManagerStatus InitializeStreamSubscriptions();

KinesisManagerStatus UninitializeStreamSubscriptions();

bool CommandCb(kinesis_video_streamer::Command::Request &req, kinesis_video_streamer::Command::Response &res);

bool Command(bool enable);

void PublishStatus(bool enable);

void Spin();

void set_subscription_installer(std::shared_ptr<RosStreamSubscriptionInstaller> subscription_installer);
Expand All @@ -49,6 +58,9 @@ class StreamerNode : public ros::NodeHandle
std::shared_ptr<RosStreamSubscriptionInstaller> subscription_installer_;
std::shared_ptr<KinesisStreamManager> stream_manager_;
StreamDefinitionProvider stream_definition_provider_;
ros::ServiceServer srv_command_;
ros::Publisher pub_enabled_;
bool enabled_;
};

} // namespace Kinesis
Expand Down
2 changes: 2 additions & 0 deletions kinesis_video_streamer/launch/kinesis_video_streamer.launch
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
@param stream_config Configuration for the (first) stream. If provided, rosparam will attempt to load the file into the private namespace
of the node. Otherwise, the example configuration file 'sample_configuration.yaml' will be loaded.
@param aws_client_configuration/region Defaults to us-west-2.
@param kinesis_video/enabled Enable/disable on start-up.
@param kinesis_video/stream_count Number of streams to load & transmit. Stream definition should be provided for each stream.
@param kinesis_video/log4cplus_config Optional path for a log4cplus config which will be used by the Kinesis Video Producer SDK.
@param kinesis_video/stream<id>/<parameter_name> as described in https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/how-data.html#how-data-header-streamdefinition
Expand All @@ -26,5 +27,6 @@

<param name="kinesis_video/log4cplus_config" value="$(find kinesis_video_streamer)/kvs_log_configuration" />
<param name="kinesis_video/stream_count" value="1" />
<param name="kinesis_video/enabled" value="true" />
</node>
</launch>
4 changes: 4 additions & 0 deletions kinesis_video_streamer/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

<buildtool_depend>catkin</buildtool_depend>

<build_depend>message_generation</build_depend>

<exec_depend>message_runtime</exec_depend>

<depend>aws_common</depend>
<depend>aws_ros1_common</depend>
<depend>kinesis_manager</depend>
Expand Down
5 changes: 0 additions & 5 deletions kinesis_video_streamer/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ int main(int argc, char * argv[])
return shutdown(options, status);
}

status = streamer.InitializeStreamSubscriptions();
if (!KINESIS_MANAGER_STATUS_SUCCEEDED(status)) {
return shutdown(options, status);
}

AWS_LOG_INFO(__func__, "Starting Kinesis Video Node...");
streamer.Spin();

Expand Down
75 changes: 74 additions & 1 deletion kinesis_video_streamer/src/streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <kinesis_video_streamer/streamer.h>
#include <kinesis_video_streamer/subscriber_callbacks.h>
#include <ros/ros.h>
#include <std_msgs/Bool.h>

using namespace Aws::Client;
using namespace Aws::Kinesis;
Expand Down Expand Up @@ -86,7 +87,20 @@ KinesisManagerStatus StreamerNode::Initialize()
<< initialize_video_producer_result);
return initialize_video_producer_result;
}

// Set up command service server
srv_command_ = advertiseService("command", &StreamerNode::CommandCb, this);
AWS_LOGSTREAM_INFO(__func__, "Service server created: " << srv_command_.getService());
// Enable on start if needed
parameter_reader_->ReadParam(GetKinesisVideoParameter("enabled"), enabled_);
pub_enabled_ = advertise<std_msgs::Bool>("enabled", 1, true); // Latched
if (enabled_) {
KinesisManagerStatus status = InitializeStreamSubscriptions();
if (KINESIS_MANAGER_STATUS_FAILED(status)) {
return status;
}
}
PublishStatus(enabled_);

return KINESIS_MANAGER_STATUS_SUCCESS;
}

Expand All @@ -106,6 +120,65 @@ KinesisManagerStatus StreamerNode::InitializeStreamSubscriptions()
return KINESIS_MANAGER_STATUS_SUCCESS;
}

KinesisManagerStatus StreamerNode::UninitializeStreamSubscriptions()
{
// Unsubscribe and free all Kinesis streams to stop streaming
int video_stream_count = 0;
parameter_reader_->ReadParam(GetKinesisVideoParameter(kStreamParameters.stream_count), video_stream_count);
for (int stream_idx = 0; stream_idx < video_stream_count; stream_idx++) {
std::string stream_name, topic_name;
parameter_reader_->ReadParam(GetStreamParameterPath(stream_idx, kStreamParameters.stream_name), stream_name);
parameter_reader_->ReadParam(GetStreamParameterPath(stream_idx, kStreamParameters.topic_name), topic_name);
subscription_installer_->Uninstall(topic_name);
stream_manager_->FreeStream(stream_name);
}
return KINESIS_MANAGER_STATUS_SUCCESS;
}

bool StreamerNode::CommandCb(kinesis_video_streamer::Command::Request &req, kinesis_video_streamer::Command::Response &res)
{
// Request node shutdown if start/stop streaming failed
if (!Command(req.enable)) {
ros::requestShutdown();
return false;
}
else
return true;
}

bool StreamerNode::Command(bool enable)
{
// Act only on mode switch
if (enable == enabled_)
return true;

KinesisManagerStatus status;
// Enable
if (enable) {
status = InitializeStreamSubscriptions(); // Can partially succeed, but returned status will be failed
}
// Disable
else {
status = UninitializeStreamSubscriptions();
}

// Set enabled if succeeded only
if (KINESIS_MANAGER_STATUS_SUCCEEDED(status)) {
enabled_ = enable;
PublishStatus(enabled_);
return true;
}
else
return false;
}

void StreamerNode::PublishStatus(bool enable)
{
std_msgs::Bool msg;
msg.data = enable;
pub_enabled_.publish(msg);
}

void StreamerNode::Spin()
{
uint32_t spinner_thread_count = kDefaultNumberOfSpinnerThreads;
Expand Down
4 changes: 4 additions & 0 deletions kinesis_video_streamer/srv/Command.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Command Kinesis video stream node

bool enable
---