Skip to content

Commit

Permalink
Option to configure input latency (#600)
Browse files Browse the repository at this point in the history
  • Loading branch information
wkozyra95 authored Jun 28, 2024
1 parent 868a428 commit 42bc051
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 21 deletions.
2 changes: 1 addition & 1 deletion compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct Port(pub u16);

pub struct RegisterInputOptions {
pub input_options: InputOptions,
pub queue_options: queue::InputOptions,
pub queue_options: queue::QueueInputOptions,
}

#[derive(Debug, Clone)]
Expand Down
47 changes: 39 additions & 8 deletions compositor_pipeline/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use self::{
video_queue::VideoQueue,
};

const DEFAULT_BUFFER_DURATION: Duration = Duration::from_millis(16 * 5); // about 5 frames at 60 fps
pub const DEFAULT_BUFFER_DURATION: Duration = Duration::from_millis(16 * 5); // about 5 frames at 60 fps
const DEFAULT_AUDIO_CHUNK_DURATION: Duration = Duration::from_millis(20); // typical audio packet size

/// Queue is responsible for consuming frames from different inputs and producing
Expand Down Expand Up @@ -56,6 +56,8 @@ pub struct Queue {
/// false - Event will be discarded.
run_late_scheduled_events: bool,

default_buffer_duration: Duration,

start_sender: Mutex<Option<Sender<QueueStartEvent>>>,
scheduled_event_sender: Sender<ScheduledEvent>,

Expand Down Expand Up @@ -108,15 +110,33 @@ impl From<QueueAudioOutput> for InputSamplesSet {
}

#[derive(Debug, Clone, Copy)]
pub struct InputOptions {
pub struct QueueInputOptions {
pub required: bool,
/// Relative offset this input stream should have to the clock that
/// starts when pipeline is started.
pub offset: Option<Duration>,

/// Duration of stream that should be buffered before stream is started.
/// If you have both audio and video streams then make sure to use the same value
/// to avoid desync.
///
/// This value defines minimal latency on the queue, but if you set it to low and fail
/// to deliver the input stream on time it can cause either black screen or flickering image.
///
/// By default DEFAULT_BUFFER_DURATION will be used.
pub buffer_duration: Option<Duration>,
}

#[derive(Debug, Clone, Copy)]
struct InputOptions {
required: bool,
offset: Option<Duration>,
buffer_duration: Duration,
}

#[derive(Debug, Clone, Copy)]
pub struct QueueOptions {
pub default_buffer_duration: Duration,
pub ahead_of_time_processing: bool,
pub output_framerate: Framerate,
pub run_late_scheduled_events: bool,
Expand Down Expand Up @@ -147,19 +167,19 @@ impl Queue {
pub fn new(opts: QueueOptions) -> Arc<Self> {
let (queue_start_sender, queue_start_receiver) = bounded(0);
let (scheduled_event_sender, scheduled_event_receiver) = bounded(0);
let buffer_duration = DEFAULT_BUFFER_DURATION;
let queue = Arc::new(Queue {
video_queue: Mutex::new(VideoQueue::new(buffer_duration)),
video_queue: Mutex::new(VideoQueue::new()),
output_framerate: opts.output_framerate,

audio_queue: Mutex::new(AudioQueue::new(buffer_duration)),
audio_queue: Mutex::new(AudioQueue::new()),
audio_chunk_duration: DEFAULT_AUDIO_CHUNK_DURATION,

scheduled_event_sender,
start_sender: Mutex::new(Some(queue_start_sender)),
ahead_of_time_processing: opts.ahead_of_time_processing,
never_drop_output_frames: opts.never_drop_output_frames,
run_late_scheduled_events: opts.run_late_scheduled_events,
default_buffer_duration: opts.default_buffer_duration,

clock: Clock::new(),
});
Expand All @@ -174,20 +194,31 @@ impl Queue {
queue
}

pub fn add_input(&self, input_id: &InputId, receiver: DecodedDataReceiver, opts: InputOptions) {
pub fn add_input(
&self,
input_id: &InputId,
receiver: DecodedDataReceiver,
opts: QueueInputOptions,
) {
let input_options = InputOptions {
required: opts.required,
offset: opts.offset,
buffer_duration: opts.buffer_duration.unwrap_or(self.default_buffer_duration),
};

if let Some(receiver) = receiver.video {
self.video_queue.lock().unwrap().add_input(
input_id,
receiver,
opts,
input_options,
self.clock.clone(),
);
};
if let Some(receiver) = receiver.audio {
self.audio_queue.lock().unwrap().add_input(
input_id,
receiver,
opts,
input_options,
self.clock.clone(),
);
}
Expand Down
6 changes: 2 additions & 4 deletions compositor_pipeline/src/queue/audio_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ use crossbeam_channel::{Receiver, TryRecvError};
#[derive(Debug)]
pub struct AudioQueue {
inputs: HashMap<InputId, AudioQueueInput>,
buffer_duration: Duration,
}

impl AudioQueue {
pub fn new(buffer_duration: Duration) -> Self {
pub fn new() -> Self {
AudioQueue {
inputs: HashMap::new(),
buffer_duration,
}
}

Expand All @@ -41,7 +39,7 @@ impl AudioQueue {
queue: VecDeque::new(),
receiver,
input_samples_processor: InputProcessor::new(
self.buffer_duration,
opts.buffer_duration,
clock,
input_id.clone(),
),
Expand Down
6 changes: 2 additions & 4 deletions compositor_pipeline/src/queue/video_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ use super::QueueVideoOutput;

pub struct VideoQueue {
inputs: HashMap<InputId, VideoQueueInput>,
buffer_duration: Duration,
}

impl VideoQueue {
pub fn new(buffer_duration: Duration) -> Self {
pub fn new() -> Self {
VideoQueue {
inputs: HashMap::new(),
buffer_duration,
}
}

Expand All @@ -44,7 +42,7 @@ impl VideoQueue {
queue: VecDeque::new(),
receiver,
input_frames_processor: InputProcessor::new(
self.buffer_duration,
opts.buffer_duration,
clock,
input_id.clone(),
),
Expand Down
15 changes: 15 additions & 0 deletions docs/pages/deployment/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,18 @@ Defaults to `UNIFORM_BUFFER_AND_STORAGE_TEXTURE_ARRAY_NON_UNIFORM_INDEXING,SAMPL

Additionally, `TEXTURE_BINDING_ARRAY` and `PUSH_CONSTANTS` are also required, but this requirement can not be overwritten by changing this
environment variable.

### `LIVE_COMPOSITOR_INPUT_BUFFER_DURATION_MS`

Duration of an input buffer in milliseconds. New stream will not be processed until this buffer is filled, so this value controls the trade-off between
latency and resilience to stream delays.

This value can be safely set to 0 if either:
- All input streams are `required`
- All input streams are started with a specific `offset_ms` and you are delivering them early enough for decoding to finish.

:::warning
Increasing this value always increases the latency of the stream by the same amount.
:::

Defaults to `80ms` (about 5 frames in 60 fps).
14 changes: 13 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{env, path::PathBuf, str::FromStr, time::Duration};

use compositor_pipeline::queue::QueueOptions;
use compositor_pipeline::queue::{self, QueueOptions};
use compositor_render::{web_renderer::WebRendererInitOptions, Framerate, WgpuFeatures};
use rand::Rng;

Expand Down Expand Up @@ -171,6 +171,17 @@ fn try_read_config() -> Result<Config, String> {
Err(_) => default_wgpu_features,
};

let default_buffer_duration = match env::var("LIVE_COMPOSITOR_INPUT_BUFFER_DURATION_MS") {
Ok(duration) => match duration.parse::<f64>() {
Ok(duration) => Duration::from_secs_f64(duration / 1000.0),
Err(_) => {
println!("CONFIG ERROR: Invalid value provided for \"LIVE_COMPOSITOR_INPUT_BUFFER_DURATION_MS\". Falling back to default value {:?}.", queue::DEFAULT_BUFFER_DURATION);
queue::DEFAULT_BUFFER_DURATION
}
},
Err(_) => queue::DEFAULT_BUFFER_DURATION,
};

let config = Config {
instance_id,
api_port,
Expand All @@ -180,6 +191,7 @@ fn try_read_config() -> Result<Config, String> {
level: logger_level,
},
queue_options: QueueOptions {
default_buffer_duration,
ahead_of_time_processing,
output_framerate: framerate,
run_late_scheduled_events,
Expand Down
9 changes: 6 additions & 3 deletions src/types/from_register_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ impl TryFrom<RtpInputStream> for pipeline::RegisterInputOptions {
transport_protocol: transport_protocol.unwrap_or(TransportProtocol::Udp).into(),
});

let queue_options = queue::InputOptions {
let queue_options = queue::QueueInputOptions {
required: required.unwrap_or(false),
offset: offset_ms.map(|offset_ms| Duration::from_secs_f64(offset_ms / 1000.0)),
buffer_duration: None,
};

Ok(pipeline::RegisterInputOptions {
Expand Down Expand Up @@ -152,9 +153,10 @@ impl TryFrom<Mp4> for pipeline::RegisterInputOptions {
(None, Some(path)) => input::mp4::Source::File(path.into()),
};

let queue_options = queue::InputOptions {
let queue_options = queue::QueueInputOptions {
required: required.unwrap_or(false),
offset: offset_ms.map(|offset_ms| Duration::from_secs_f64(offset_ms / 1000.0)),
buffer_duration: None,
};

Ok(pipeline::RegisterInputOptions {
Expand All @@ -175,9 +177,10 @@ impl TryFrom<DeckLink> for pipeline::RegisterInputOptions {
display_name: value.display_name,
enable_audio: value.enable_audio.unwrap_or(true),
}),
queue_options: queue::InputOptions {
queue_options: queue::QueueInputOptions {
required: value.required.unwrap_or(false),
offset: None,
buffer_duration: Some(Duration::from_millis(5)),
},
})
}
Expand Down

0 comments on commit 42bc051

Please sign in to comment.