Skip to content

Commit

Permalink
Add prints to metadata_syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
OhadMeir committed Dec 30, 2024
1 parent e18af41 commit 4f1c402
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/dds/rs-dds-sensor-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ void dds_sensor_proxy::handle_video_data( realdds::topics::image_msg && dds_fram

if( _md_enabled )
{
LOG_INFO( "dds_sensor_proxy::handle_video_data calling enqueue_frame for type " << vid_profile->get_stream_type());
streaming.syncer.enqueue_frame( dds_frame.timestamp().to_ns(), streaming.syncer.hold( new_frame ) );
}
else
Expand Down Expand Up @@ -486,14 +487,15 @@ void dds_sensor_proxy::start( rs2_frame_callback_sptr callback )
auto & streaming = _streaming_by_name[dds_stream->name()];
streaming.syncer.on_frame_release( frame_releaser );
streaming.syncer.on_frame_ready(
[this, &streaming]( syncer_type::frame_holder && fh, std::shared_ptr< const json > const & md )
[this, &streaming, &dds_stream]( syncer_type::frame_holder && fh, std::shared_ptr< const json > const & md )
{
if( _is_streaming ) // stop was not called
{
if( ! md )
add_no_metadata( static_cast< frame * >( fh.get() ), streaming );
else
add_frame_metadata( static_cast< frame * >( fh.get() ), *md, streaming );
LOG_INFO( dds_stream->name() << " calling invoke_new_frame, last frame number " << streaming.last_frame_number );
invoke_new_frame( static_cast< frame * >( fh.release() ), nullptr, nullptr );
}
} );
Expand Down
28 changes: 26 additions & 2 deletions third-party/realdds/src/dds-metadata-syncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,34 @@ dds_metadata_syncer::dds_metadata_syncer()
: _is_alive( std::make_shared< bool >( true ) )
, _on_frame_release( nullptr )
{
LOG_INFO( "dds_metadata_syncer::dds_metadata_syncer" );
}


dds_metadata_syncer::~dds_metadata_syncer()
{
LOG_INFO( "dds_metadata_syncer::~dds_metadata_syncer" );
_is_alive.reset();

std::lock_guard< std::mutex > lock( _queues_lock );
LOG_INFO( "dds_metadata_syncer::~dds_metadata_syncer got lock" );
_frame_queue.clear();
_metadata_queue.clear();
}


void dds_metadata_syncer::enqueue_frame( key_type id, frame_holder && frame )
{
LOG_INFO( "dds_metadata_syncer::enqueue_frame" );
std::weak_ptr< bool > alive = _is_alive;
if( ! alive.lock() ) // Check if was destructed by another thread
if( !alive.lock() ) // Check if was destructed by another thread
{
LOG_INFO( "dds_metadata_syncer::enqueue_frame not alive" );
return;
}

std::unique_lock< std::mutex > lock( _queues_lock );
LOG_INFO( "dds_metadata_syncer::enqueue_frame got lock" );
// Expect increasing order
if( ! _frame_queue.empty() && _frame_queue.back().first >= id )
DDS_THROW( runtime_error, "frame " << id << " cannot be enqueued after " << _frame_queue.back().first );
Expand Down Expand Up @@ -105,6 +113,8 @@ void dds_metadata_syncer::search_for_match( std::unique_lock< std::mutex > & loc

bool dds_metadata_syncer::handle_match( std::unique_lock< std::mutex > & lock )
{
LOG_INFO( "dds_metadata_syncer::handle_match _frame_queue size " << _frame_queue.size() <<
" _metadata_queue size = " << _metadata_queue.size() );
std::weak_ptr< bool > alive = _is_alive;

frame_holder fh = std::move( _frame_queue.front().second );
Expand All @@ -114,10 +124,16 @@ bool dds_metadata_syncer::handle_match( std::unique_lock< std::mutex > & lock )

if( _on_frame_ready )
{
LOG_INFO( "dds_metadata_syncer::handle_match opening lock" );
lock.unlock();
LOG_INFO( "dds_metadata_syncer::handle_match calling _on_frame_ready" );
_on_frame_ready( std::move( fh ), md );
if( ! alive.lock() ) // Check if was destructed by another thread during callback
if( !alive.lock() ) // Check if was destructed by another thread during callback
{
LOG_INFO( "dds_metadata_syncer::handle_match not alive" );
return false;
}
LOG_INFO( "dds_metadata_syncer::handle_match locking lock" );
lock.lock();
}

Expand All @@ -127,17 +143,25 @@ bool dds_metadata_syncer::handle_match( std::unique_lock< std::mutex > & lock )

bool dds_metadata_syncer::handle_frame_without_metadata( std::unique_lock< std::mutex > & lock )
{
LOG_INFO( "dds_metadata_syncer::handle_frame_without_metadata _frame_queue size " << _frame_queue.size() <<
" _metadata_queue size = " << _metadata_queue.size() );
std::weak_ptr< bool > alive = _is_alive;

frame_holder fh = std::move( _frame_queue.front().second );
_frame_queue.pop_front();

if( _on_frame_ready )
{
LOG_INFO( "dds_metadata_syncer::handle_frame_without_metadata opening lock" );
lock.unlock();
LOG_INFO( "dds_metadata_syncer::handle_frame_without_metadata calling _on_frame_ready" );
_on_frame_ready( std::move( fh ), metadata_type() );
if( ! alive.lock() ) // Check if was destructed by another thread during callback
{
LOG_INFO( "dds_metadata_syncer::handle_frame_without_metadata not alive" );
return false;
}
LOG_INFO( "dds_metadata_syncer::handle_frame_without_metadata locking lock" );
lock.lock();
}

Expand Down

0 comments on commit 4f1c402

Please sign in to comment.