Skip to content

Commit

Permalink
added PLI handling
Browse files Browse the repository at this point in the history
  • Loading branch information
brzep committed Oct 23, 2024
1 parent c893415 commit abd56c2
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 13 deletions.
12 changes: 11 additions & 1 deletion compositor_pipeline/src/pipeline/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ impl Encoder {
}
}

pub fn request_keyframe(&self) -> Option<Sender<()>> {
match self.video.as_ref() {
Some(VideoEncoder::H264(encoder)) => Some(encoder.request_keyframe().clone()),
None => {
error!("Non video encoder received keyframe request.");
None
}
}
}

pub fn samples_batch_sender(&self) -> Option<&Sender<PipelineEvent<OutputSamples>>> {
match &self.audio {
Some(encoder) => Some(encoder.samples_batch_sender()),
Expand Down Expand Up @@ -138,7 +148,7 @@ impl VideoEncoder {
}
}

pub fn request_keyframe(&self) {
pub fn request_keyframe(&self) -> Sender<()> {
match self {
Self::H264(encoder) => encoder.request_keyframe(),
}
Expand Down
6 changes: 2 additions & 4 deletions compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,8 @@ impl LibavH264Encoder {
self.resolution
}

pub fn request_keyframe(&self) {
if let Err(err) = self.keyframe_req_sender.send(()) {
debug!(%err, "Failed to send keyframe request to the encoder.");
}
pub fn request_keyframe(&self) -> crossbeam_channel::Sender<()> {
self.keyframe_req_sender.clone()
}
}

Expand Down
19 changes: 15 additions & 4 deletions compositor_pipeline/src/pipeline/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use compositor_render::{
};
use crossbeam_channel::{bounded, Receiver, Sender};
use mp4::{Mp4FileWriter, Mp4OutputOptions};
use tracing::debug;

use crate::{audio_mixer::OutputSamples, error::RegisterOutputError, queue::PipelineEvent};

Expand Down Expand Up @@ -121,8 +122,14 @@ impl OutputOptionsExt<Option<Port>> for OutputOptions {
Ok((Output::Mp4 { writer, encoder }, None))
}
OutputProtocolOptions::Whip(whip_options) => {
let sender = whip::WhipSender::new(output_id, whip_options.clone(), packets, ctx)
.map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?;
let sender = whip::WhipSender::new(
output_id,
whip_options.clone(),
packets,
encoder.request_keyframe(),
ctx,
)
.map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?;

Ok((Output::Whip { sender, encoder }, None))
}
Expand Down Expand Up @@ -222,11 +229,15 @@ impl Output {
Output::RawData { .. } => return Err(RequestKeyframeError::RawOutput(output_id)),
};

encoder
if let Err(err) = encoder
.video
.as_ref()
.ok_or(RequestKeyframeError::NoVideoOutput(output_id))?
.request_keyframe();
.request_keyframe()
.send(())
{
debug!(%err, "Failed to send keyframe request to the encoder.");
};

Ok(())
}
Expand Down
34 changes: 32 additions & 2 deletions compositor_pipeline/src/pipeline/output/whip.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use compositor_render::OutputId;
use crossbeam_channel::Receiver;
use crossbeam_channel::{Receiver, Sender};
use payloader::Payload;
use reqwest::{header::HeaderMap, Url};
use std::sync::{atomic::AtomicBool, Arc};
use tracing::{debug, error, info, span, Level};
use tracing::{debug, error, info, span, warn, Level};
use webrtc::{
api::{
interceptor_registry::register_default_interceptors,
Expand All @@ -16,6 +16,7 @@ use webrtc::{
configuration::RTCConfiguration, sdp::session_description::RTCSessionDescription,
RTCPeerConnection,
},
rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication,
rtp_transceiver::{
rtp_codec::{RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType},
rtp_transceiver_direction::RTCRtpTransceiverDirection,
Expand Down Expand Up @@ -59,6 +60,7 @@ impl WhipSender {
output_id: &OutputId,
options: WhipSenderOptions,
packets_receiver: Receiver<EncoderOutputEvent>,
request_keyframe_sender: Option<Sender<()>>,
pipeline_ctx: &PipelineCtx,
) -> Result<Self, OutputInitError> {
let payloader = Payloader::new(options.video, options.audio);
Expand All @@ -70,6 +72,7 @@ impl WhipSender {
let output_id = output_id.clone();
let should_close2 = should_close.clone();
let event_emitter = pipeline_ctx.event_emitter.clone();
let request_keyframe_sender = request_keyframe_sender.clone();
let tokio_rt = pipeline_ctx.tokio_rt.clone();

std::thread::Builder::new()
Expand All @@ -86,6 +89,7 @@ impl WhipSender {
bearer_token,
should_close2,
packet_stream,
request_keyframe_sender,
tokio_rt,
);
event_emitter.emit(Event::OutputDone(output_id));
Expand All @@ -112,6 +116,7 @@ fn start_whip_sender_thread(
bearer_token: Option<String>,
should_close: Arc<AtomicBool>,
packet_stream: PacketStream,
request_keyframe_sender: Option<Sender<()>>,
tokio_rt: Arc<tokio::runtime::Runtime>,
) {
tokio_rt.block_on(async {
Expand All @@ -124,6 +129,7 @@ fn start_whip_sender_thread(
should_close.clone(),
tokio_rt.clone(),
client.clone(),
request_keyframe_sender,
)
.await
{
Expand Down Expand Up @@ -246,6 +252,7 @@ async fn connect(
should_close: Arc<AtomicBool>,
tokio_rt: Arc<tokio::runtime::Runtime>,
client: reqwest::Client,
request_keyframe_sender: Option<Sender<()>>,
) -> Result<Url, WhipError> {
peer_connection.on_ice_connection_state_change(Box::new(
move |connection_state: RTCIceConnectionState| {
Expand All @@ -260,6 +267,29 @@ async fn connect(
},
));

if let Some(keyframe_sender) = request_keyframe_sender {
let senders = peer_connection.get_senders().await;
for sender in senders {
let keyframe_sender_clone = keyframe_sender.clone();
tokio_rt.spawn(async move {
loop {
let packets = &sender.read_rtcp().await.unwrap().0;
for packet in packets {
if packet
.as_any()
.downcast_ref::<PictureLossIndication>()
.is_some()
{
if let Err(err) = keyframe_sender_clone.send(()) {
debug!(%err, "Failed to send keyframe request to the encoder.");
};
}
}
}
});
}
}

let offer = peer_connection.create_offer(None).await.unwrap();

info!("[WHIP] endpoint url: {}", endpoint_url);
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/examples/whip_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ fn client_code() -> Result<()> {
&json!({
"type": "whip",
"endpoint_url": "https://g.webrtc.live-video.net:4443/v2/offer",
"bearer_token": "", // your Bearer token
"bearer_token": "live_415462268_d64Q7l1UrCnKC2nvacJ3dMZaxcITq5", // your Bearer token
"video": {
"resolution": {
"width": VIDEO_RESOLUTION.width,
"height": VIDEO_RESOLUTION.height,
},
"encoder": {
"type": "ffmpeg_h264",
"preset": "ultrafast"
"preset": "medium"
},
"initial": {
"root": {
Expand Down

0 comments on commit abd56c2

Please sign in to comment.