From abd56c210d5f347d8d3b2801c61118c96141b4fe Mon Sep 17 00:00:00 2001 From: bartosz rzepa Date: Wed, 23 Oct 2024 09:01:25 +0200 Subject: [PATCH] added PLI handling --- compositor_pipeline/src/pipeline/encoder.rs | 12 ++++++- .../src/pipeline/encoder/ffmpeg_h264.rs | 6 ++-- compositor_pipeline/src/pipeline/output.rs | 19 ++++++++--- .../src/pipeline/output/whip.rs | 34 +++++++++++++++++-- integration_tests/examples/whip_client.rs | 4 +-- 5 files changed, 62 insertions(+), 13 deletions(-) diff --git a/compositor_pipeline/src/pipeline/encoder.rs b/compositor_pipeline/src/pipeline/encoder.rs index a770feb36..35cc64ece 100644 --- a/compositor_pipeline/src/pipeline/encoder.rs +++ b/compositor_pipeline/src/pipeline/encoder.rs @@ -100,6 +100,16 @@ impl Encoder { } } + pub fn request_keyframe(&self) -> Option> { + match self.video.as_ref() { + Some(VideoEncoder::H264(encoder)) => Some(encoder.request_keyframe().clone()), + None => { + error!("Non video encoder received keyframe request."); + None + } + } + } + pub fn samples_batch_sender(&self) -> Option<&Sender>> { match &self.audio { Some(encoder) => Some(encoder.samples_batch_sender()), @@ -138,7 +148,7 @@ impl VideoEncoder { } } - pub fn request_keyframe(&self) { + pub fn request_keyframe(&self) -> Sender<()> { match self { Self::H264(encoder) => encoder.request_keyframe(), } diff --git a/compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs b/compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs index c6ebe37f9..e2266224a 100644 --- a/compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs +++ b/compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs @@ -148,10 +148,8 @@ impl LibavH264Encoder { self.resolution } - pub fn request_keyframe(&self) { - if let Err(err) = self.keyframe_req_sender.send(()) { - debug!(%err, "Failed to send keyframe request to the encoder."); - } + pub fn request_keyframe(&self) -> crossbeam_channel::Sender<()> { + self.keyframe_req_sender.clone() } } diff --git a/compositor_pipeline/src/pipeline/output.rs b/compositor_pipeline/src/pipeline/output.rs index cd90f4eed..6b07e881a 100644 --- a/compositor_pipeline/src/pipeline/output.rs +++ b/compositor_pipeline/src/pipeline/output.rs @@ -3,6 +3,7 @@ use compositor_render::{ }; use crossbeam_channel::{bounded, Receiver, Sender}; use mp4::{Mp4FileWriter, Mp4OutputOptions}; +use tracing::debug; use crate::{audio_mixer::OutputSamples, error::RegisterOutputError, queue::PipelineEvent}; @@ -121,8 +122,14 @@ impl OutputOptionsExt> for OutputOptions { Ok((Output::Mp4 { writer, encoder }, None)) } OutputProtocolOptions::Whip(whip_options) => { - let sender = whip::WhipSender::new(output_id, whip_options.clone(), packets, ctx) - .map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?; + let sender = whip::WhipSender::new( + output_id, + whip_options.clone(), + packets, + encoder.request_keyframe(), + ctx, + ) + .map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?; Ok((Output::Whip { sender, encoder }, None)) } @@ -222,11 +229,15 @@ impl Output { Output::RawData { .. } => return Err(RequestKeyframeError::RawOutput(output_id)), }; - encoder + if let Err(err) = encoder .video .as_ref() .ok_or(RequestKeyframeError::NoVideoOutput(output_id))? - .request_keyframe(); + .request_keyframe() + .send(()) + { + debug!(%err, "Failed to send keyframe request to the encoder."); + }; Ok(()) } diff --git a/compositor_pipeline/src/pipeline/output/whip.rs b/compositor_pipeline/src/pipeline/output/whip.rs index 0e0a07b81..3d6b743ca 100644 --- a/compositor_pipeline/src/pipeline/output/whip.rs +++ b/compositor_pipeline/src/pipeline/output/whip.rs @@ -1,9 +1,9 @@ use compositor_render::OutputId; -use crossbeam_channel::Receiver; +use crossbeam_channel::{Receiver, Sender}; use payloader::Payload; use reqwest::{header::HeaderMap, Url}; use std::sync::{atomic::AtomicBool, Arc}; -use tracing::{debug, error, info, span, Level}; +use tracing::{debug, error, info, span, warn, Level}; use webrtc::{ api::{ interceptor_registry::register_default_interceptors, @@ -16,6 +16,7 @@ use webrtc::{ configuration::RTCConfiguration, sdp::session_description::RTCSessionDescription, RTCPeerConnection, }, + rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication, rtp_transceiver::{ rtp_codec::{RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType}, rtp_transceiver_direction::RTCRtpTransceiverDirection, @@ -59,6 +60,7 @@ impl WhipSender { output_id: &OutputId, options: WhipSenderOptions, packets_receiver: Receiver, + request_keyframe_sender: Option>, pipeline_ctx: &PipelineCtx, ) -> Result { let payloader = Payloader::new(options.video, options.audio); @@ -70,6 +72,7 @@ impl WhipSender { let output_id = output_id.clone(); let should_close2 = should_close.clone(); let event_emitter = pipeline_ctx.event_emitter.clone(); + let request_keyframe_sender = request_keyframe_sender.clone(); let tokio_rt = pipeline_ctx.tokio_rt.clone(); std::thread::Builder::new() @@ -86,6 +89,7 @@ impl WhipSender { bearer_token, should_close2, packet_stream, + request_keyframe_sender, tokio_rt, ); event_emitter.emit(Event::OutputDone(output_id)); @@ -112,6 +116,7 @@ fn start_whip_sender_thread( bearer_token: Option, should_close: Arc, packet_stream: PacketStream, + request_keyframe_sender: Option>, tokio_rt: Arc, ) { tokio_rt.block_on(async { @@ -124,6 +129,7 @@ fn start_whip_sender_thread( should_close.clone(), tokio_rt.clone(), client.clone(), + request_keyframe_sender, ) .await { @@ -246,6 +252,7 @@ async fn connect( should_close: Arc, tokio_rt: Arc, client: reqwest::Client, + request_keyframe_sender: Option>, ) -> Result { peer_connection.on_ice_connection_state_change(Box::new( move |connection_state: RTCIceConnectionState| { @@ -260,6 +267,29 @@ async fn connect( }, )); + if let Some(keyframe_sender) = request_keyframe_sender { + let senders = peer_connection.get_senders().await; + for sender in senders { + let keyframe_sender_clone = keyframe_sender.clone(); + tokio_rt.spawn(async move { + loop { + let packets = &sender.read_rtcp().await.unwrap().0; + for packet in packets { + if packet + .as_any() + .downcast_ref::() + .is_some() + { + if let Err(err) = keyframe_sender_clone.send(()) { + debug!(%err, "Failed to send keyframe request to the encoder."); + }; + } + } + } + }); + } + } + let offer = peer_connection.create_offer(None).await.unwrap(); info!("[WHIP] endpoint url: {}", endpoint_url); diff --git a/integration_tests/examples/whip_client.rs b/integration_tests/examples/whip_client.rs index 370036d67..d98fda733 100644 --- a/integration_tests/examples/whip_client.rs +++ b/integration_tests/examples/whip_client.rs @@ -33,7 +33,7 @@ fn client_code() -> Result<()> { &json!({ "type": "whip", "endpoint_url": "https://g.webrtc.live-video.net:4443/v2/offer", - "bearer_token": "", // your Bearer token + "bearer_token": "live_415462268_d64Q7l1UrCnKC2nvacJ3dMZaxcITq5", // your Bearer token "video": { "resolution": { "width": VIDEO_RESOLUTION.width, @@ -41,7 +41,7 @@ fn client_code() -> Result<()> { }, "encoder": { "type": "ffmpeg_h264", - "preset": "ultrafast" + "preset": "medium" }, "initial": { "root": {