Skip to content

Commit

Permalink
Don't send receiver reports during simulcast probe
Browse files Browse the repository at this point in the history
libwebrtc will stop sending SDES headers after the first receiver
report on a SSRC. This breaks probing if the receiver report is sent
before probing is completed. This change delays receiver reports for a
SSRC until the first non-probe packet is received.
  • Loading branch information
anders-avos committed Feb 3, 2025
1 parent ffead9f commit 2decea1
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 12 deletions.
3 changes: 3 additions & 0 deletions interceptor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub mod twcc;

pub use error::Error;

/// Attribute indicating the stream is probing incoming packets.
pub const ATTR_READ_PROBE: usize = 2295978936;

/// Attributes are a generic key/value store used by interceptors
pub type Attributes = HashMap<usize, usize>;

Expand Down
16 changes: 11 additions & 5 deletions interceptor/src/report/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ impl ReceiverReport {
m.values().cloned().collect()
};
for stream in streams {
let pkt = stream.generate_report(now);

let a = Attributes::new();
if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
log::warn!("failed sending: {}", err);
if let Some(pkt) = stream.generate_report(now) {
let a = Attributes::new();
if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
log::warn!("failed sending: {}", err);
}
}
}
}
Expand Down Expand Up @@ -186,11 +186,17 @@ impl Interceptor for ReceiverReport {
info: &StreamInfo,
reader: Arc<dyn RTPReader + Send + Sync>,
) -> Arc<dyn RTPReader + Send + Sync> {
let wait_for_probe = info
.attributes
.get(&crate::ATTR_READ_PROBE)
.is_some_and(|v| *v != 0);

let stream = Arc::new(ReceiverStream::new(
info.ssrc,
info.clock_rate,
reader,
self.internal.now.clone(),
wait_for_probe,
));
{
let mut streams = self.internal.streams.lock().await;
Expand Down
26 changes: 20 additions & 6 deletions interceptor/src/report/receiver/receiver_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct ReceiverStreamInternal {

packets: Vec<u64>,
started: bool,
wait_for_probe: bool,
seq_num_cycles: u16,
last_seq_num: i32,
last_report_seq_num: i32,
Expand Down Expand Up @@ -40,7 +41,7 @@ impl ReceiverStreamInternal {
(self.packets[pos / 64] & (1 << (pos % 64))) != 0
}

fn process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet) {
fn process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet, is_probe: bool) {
if !self.started {
// first frame
self.started = true;
Expand Down Expand Up @@ -79,6 +80,7 @@ impl ReceiverStreamInternal {

self.last_rtp_time_rtp = pkt.header.timestamp;
self.last_rtp_time_time = now;
self.wait_for_probe &= is_probe;
}

fn process_sender_report(&mut self, now: SystemTime, sr: &rtcp::sender_report::SenderReport) {
Expand Down Expand Up @@ -158,6 +160,7 @@ impl ReceiverStream {
clock_rate: u32,
reader: Arc<dyn RTPReader + Send + Sync>,
now: Option<FnTimeGen>,
wait_for_probe: bool,
) -> Self {
let receiver_ssrc = rand::random::<u32>();
ReceiverStream {
Expand All @@ -171,6 +174,7 @@ impl ReceiverStream {

packets: vec![0u64; 128],
started: false,
wait_for_probe,
seq_num_cycles: 0,
last_seq_num: 0,
last_report_seq_num: 0,
Expand All @@ -184,9 +188,9 @@ impl ReceiverStream {
}
}

pub(crate) fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet) {
pub(crate) fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet, is_probe: bool) {
let mut internal = self.internal.lock();
internal.process_rtp(now, pkt);
internal.process_rtp(now, pkt, is_probe);
}

pub(crate) fn process_sender_report(
Expand All @@ -198,9 +202,17 @@ impl ReceiverStream {
internal.process_sender_report(now, sr);
}

pub(crate) fn generate_report(&self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport {
pub(crate) fn generate_report(
&self,
now: SystemTime,
) -> Option<rtcp::receiver_report::ReceiverReport> {
let mut internal = self.internal.lock();
internal.generate_report(now)

if internal.wait_for_probe {
return None;
}

Some(internal.generate_report(now))
}
}

Expand All @@ -213,14 +225,16 @@ impl RTPReader for ReceiverStream {
buf: &mut [u8],
a: &Attributes,
) -> Result<(rtp::packet::Packet, Attributes)> {
let is_probe = a.get(&crate::ATTR_READ_PROBE).is_some_and(|v| *v != 0);

let (pkt, attr) = self.parent_rtp_reader.read(buf, a).await?;

let now = if let Some(f) = &self.now {
f()
} else {
SystemTime::now()
};
self.process_rtp(now, &pkt);
self.process_rtp(now, &pkt, is_probe);

Ok((pkt, attr))
}
Expand Down
67 changes: 67 additions & 0 deletions interceptor/src/report/receiver/receiver_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,73 @@ async fn test_receiver_interceptor_before_any_packet() -> Result<()> {
Ok(())
}

#[tokio::test(start_paused = true)]
async fn test_receiver_interceptor_read_probe() -> Result<()> {
let mt = Arc::new(MockTime::default());
let time_gen = {
let mt = Arc::clone(&mt);
Arc::new(move || mt.now())
};

let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
.with_interval(Duration::from_millis(50))
.with_now_fn(time_gen)
.build("")?;

let stream = MockStream::new(
&StreamInfo {
ssrc: 123456,
clock_rate: 90000,
attributes: [(crate::ATTR_READ_PROBE, 1)].into_iter().collect(),
..Default::default()
},
icpr,
)
.await;

// no report initially
tokio::time::timeout(Duration::from_millis(60), stream.written_rtcp())
.await
.expect_err("expected no report");

stream
.receive_rtp(rtp::packet::Packet {
header: rtp::header::Header {
sequence_number: 7,
..Default::default()
},
..Default::default()
})
.await;

let pkts = stream.written_rtcp().await.unwrap();
assert_eq!(pkts.len(), 1);
if let Some(rr) = pkts[0]
.as_any()
.downcast_ref::<rtcp::receiver_report::ReceiverReport>()
{
assert_eq!(rr.reports.len(), 1);
assert_eq!(
rr.reports[0],
rtcp::reception_report::ReceptionReport {
ssrc: 123456,
last_sequence_number: 7,
last_sender_report: 0,
fraction_lost: 0,
total_lost: 0,
delay: 0,
jitter: 0,
}
)
} else {
panic!();
}

stream.close().await?;

Ok(())
}

#[tokio::test]
async fn test_receiver_interceptor_after_rtp_packets() -> Result<()> {
let mt = Arc::new(MockTime::default());
Expand Down
8 changes: 7 additions & 1 deletion webrtc/src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1060,14 +1060,20 @@ impl PeerConnectionInternal {
None => return Err(Error::ErrInterceptorNotBind),
};

let stream_info = create_stream_info(
let mut stream_info = create_stream_info(
"".to_owned(),
ssrc,
params.codecs[0].payload_type,
params.codecs[0].capability.clone(),
&params.header_extensions,
None,
);

// indicate this stream starts with probing
stream_info
.attributes
.insert(interceptor::ATTR_READ_PROBE, 1);

let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self
.dtls_transport
.streams_for_ssrc(ssrc, &stream_info, &icpr)
Expand Down

0 comments on commit 2decea1

Please sign in to comment.