Skip to content
This repository has been archived by the owner on Sep 19, 2024. It is now read-only.

Commit

Permalink
Merge branch 'master' into fix/segment_duration_module
Browse files Browse the repository at this point in the history
  • Loading branch information
blazpie authored May 23, 2023
2 parents 667791b + cccaa89 commit f8fdfcb
Show file tree
Hide file tree
Showing 10 changed files with 328 additions and 32 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The package can be installed by adding `membrane_rtc_engine` to your list of dep
```elixir
def deps do
[
{:membrane_rtc_engine, "~> 0.12.0"}
{:membrane_rtc_engine, "~> 0.12.1"}
]
end
```
Expand Down
1 change: 1 addition & 0 deletions lib/membrane_rtc_engine/endpoints/hls_endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ if Enum.all?(
```
[
:membrane_h264_ffmpeg_plugin,
:membrane_h264_plugin,
:membrane_http_adaptive_stream_plugin,
:membrane_opus_plugin,
:membrane_aac_plugin,
Expand Down
5 changes: 4 additions & 1 deletion lib/membrane_rtc_engine/endpoints/rtsp_endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ if Enum.all?(
@max_reconnect_attempts 3
@reconnect_delay 15_000
@keep_alive_interval 15_000
# Increased buffer size helps ensure the stability of transmission and eliminate video artefacts
@recv_buffer_size 1024 * 1024

def_output_pad :output,
demand_unit: :buffers,
Expand Down Expand Up @@ -273,7 +275,8 @@ if Enum.all?(
structure = [
child(:udp_source, %Membrane.UDP.Source{
local_port_no: state.rtp_port,
pierce_nat_ctx: pierce_nat_ctx
pierce_nat_ctx: pierce_nat_ctx,
recv_buffer_size: @recv_buffer_size
})
|> via_in(Pad.ref(:rtp_input, make_ref()))
|> child(:rtp, %Membrane.RTP.SessionBin{
Expand Down
23 changes: 16 additions & 7 deletions lib/membrane_rtc_engine/engine.ex
Original file line number Diff line number Diff line change
Expand Up @@ -594,12 +594,19 @@ defmodule Membrane.RTC.Engine do
|> Map.keys()
|> Enum.map(&{:notify_child, {{:endpoint, &1}, {:new_peer, peer}}})

active_tracks = get_active_tracks(state.endpoints)

new_tracks_notification =
if active_tracks == [] do
[]
else
[notify_child: {{:endpoint, endpoint_id}, {:new_tracks, active_tracks}}]
end

actions =
[
notify_child: {{:endpoint, endpoint_id}, {:ready, peers_in_room}},
notify_child:
{{:endpoint, endpoint_id}, {:new_tracks, get_active_tracks(state.endpoints)}}
] ++ new_peer_notifications
notify_child: {{:endpoint, endpoint_id}, {:ready, peers_in_room}}
] ++ new_tracks_notification ++ new_peer_notifications

state =
state
Expand Down Expand Up @@ -850,11 +857,13 @@ defmodule Membrane.RTC.Engine do
else: []

# Only inform about the tracks if we're not taking about a peer
active_tracks = get_active_tracks(state.endpoints)

tracks_actions =
if is_peer? do
[]
if not is_peer? and active_tracks != [] do
[notify_child: {endpoint_name, {:new_tracks, active_tracks}}]
else
[notify_child: {endpoint_name, {:new_tracks, get_active_tracks(state.endpoints)}}]
[]
end

actions = [spec: spec] ++ display_manager_message ++ tracks_actions
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.RTC.Engine.MixProject do
use Mix.Project

@version "0.12.0"
@version "0.12.1"
@github_url "https://github.com/jellyfish-dev/membrane_rtc_engine"

def project do
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"membrane_udp_plugin": {:hex, :membrane_udp_plugin, "0.9.2", "b3e3f1026cbff45b80c9ff5f18206c95e5e2df598111b9a1264fb6e4cb62097d", [:mix], [{:membrane_core, "~> 0.11.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3.0", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "f542452e503be5ae4cbb2484c9e4454fbd60b3fb35a71cb7fe8099460b5a5a0d"},
"membrane_video_compositor_plugin": {:hex, :membrane_video_compositor_plugin, "0.3.1", "be7746462791538d592f4d76df33bba587b7d8345bf310cc9794a9199b06d154", [:mix], [{:membrane_core, "~> 0.11.2", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_framerate_converter_plugin, "~> 0.6.1", [hex: :membrane_framerate_converter_plugin, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.3.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}, {:rustler, "~> 0.26.0", [hex: :rustler, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "c4b2efca02daadef0084737faedc77a14593ee2f2195eafa70754f9c02796af4"},
"membrane_vp8_format": {:hex, :membrane_vp8_format, "0.4.0", "6c29ec67479edfbab27b11266dc92f18f3baf4421262c5c31af348c33e5b92c7", [:mix], [], "hexpm", "8bb005ede61db8fcb3535a883f32168b251c2dfd1109197c8c3b39ce28ed08e2"},
"membrane_webrtc_plugin": {:hex, :membrane_webrtc_plugin, "0.14.4", "308a1574cfc0b582fc7fecbda28aec035765bb4ff4c79cd6e0ee1014128b54a3", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, ">= 0.0.0", [hex: :ex_libsrtp, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.11.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.11.2", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.7.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_ffmpeg_plugin, "~> 0.26.2", [hex: :membrane_h264_ffmpeg_plugin, repo: "hexpm", optional: false]}, {:membrane_ice_plugin, "~> 0.14.3", [hex: :membrane_ice_plugin, repo: "hexpm", optional: false]}, {:membrane_opentelemetry, "~> 0.1.0", [hex: :membrane_opentelemetry, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.6.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_rtp_h264_plugin, "~> 0.15.0", [hex: :membrane_rtp_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_opus_plugin, "~> 0.7.0", [hex: :membrane_rtp_opus_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_plugin, "~> 0.22.0", [hex: :membrane_rtp_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_vp8_plugin, "~> 0.7.1", [hex: :membrane_rtp_vp8_plugin, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.0.4", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.0", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "32df7912c921db52298d4f548ac1a1faf1aae8114f73095c823f9deefa1ea012"},
"membrane_webrtc_plugin": {:hex, :membrane_webrtc_plugin, "0.14.5", "ff00079c6ced6d0dfe7f0520a5c9df7230547ef5673fd811072d9692f0177220", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, ">= 0.0.0", [hex: :ex_libsrtp, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.11.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.11.2", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.7.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_ffmpeg_plugin, "~> 0.26.2", [hex: :membrane_h264_ffmpeg_plugin, repo: "hexpm", optional: false]}, {:membrane_ice_plugin, "~> 0.14.3", [hex: :membrane_ice_plugin, repo: "hexpm", optional: false]}, {:membrane_opentelemetry, "~> 0.1.0", [hex: :membrane_opentelemetry, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.6.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_rtp_h264_plugin, "~> 0.15.0", [hex: :membrane_rtp_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_opus_plugin, "~> 0.7.0", [hex: :membrane_rtp_opus_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_plugin, "~> 0.22.0", [hex: :membrane_rtp_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_vp8_plugin, "~> 0.7.1", [hex: :membrane_rtp_vp8_plugin, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.0.4", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.0", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "ee93ded061843a1cfce6e1d78fb648c351f9771033babce59e69a93876bb3338"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mockery": {:hex, :mockery, "2.3.1", "a02fd60b10ac9ed37a7a2ecf6786c1f1dd5c75d2b079a60594b089fba32dc087", [:mix], [], "hexpm", "1d0971d88ebf084e962da3f2cfee16f0ea8e04ff73a7710428500d4500b947fa"},
Expand Down
2 changes: 2 additions & 0 deletions test/membrane_rtc_engine/engine_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ defmodule Membrane.RTC.EngineTest do

assert_receive {:new_peer, %Peer{id: "peer", metadata: "metadata"}}
assert_receive {:ready, []}
refute_receive {:new_tracks, []}
end

test "is ignored for non-peers", %{rtc_engine: rtc_engine} do
Expand All @@ -52,6 +53,7 @@ defmodule Membrane.RTC.EngineTest do

refute_receive {:new_peer, _peer}
refute_receive {:ready, _peers_in_room}
refute_receive {:new_tracks, []}
end

test "reports other peers", %{rtc_engine: rtc_engine} do
Expand Down
6 changes: 3 additions & 3 deletions test/membrane_rtc_engine/hls_endpoint_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Membrane.RTC.HLSEndpointTest do
@fixtures_dir "./test/fixtures/"
@reference_dir "./test/hls_reference/"
@main_manifest "index.m3u8"
@accaptable_bandwidth_diff 500_000
@acceptable_bandwidth_diff 500_000

setup do
options = [
Expand Down Expand Up @@ -546,8 +546,8 @@ defmodule Membrane.RTC.HLSEndpointTest do
output_bandwidth = String.to_integer(output_bandwidth)
reference_bandwidth = String.to_integer(reference_bandwidth)

assert output_bandwidth + @accaptable_bandwidth_diff > reference_bandwidth and
output_bandwidth < reference_bandwidth + @accaptable_bandwidth_diff
assert output_bandwidth + @acceptable_bandwidth_diff > reference_bandwidth and
output_bandwidth < reference_bandwidth + @acceptable_bandwidth_diff

assert output_without_bandwidth == reference_without_bandwidth
end
Expand Down
122 changes: 116 additions & 6 deletions test/membrane_rtc_engine/rtsp_endpoint_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ defmodule Membrane.RTC.RTSPEndpointTest do
use ExUnit.Case

alias Membrane.RTC.Engine
alias Membrane.RTC.Engine.Endpoint.RTSP
alias Membrane.RTC.Engine.Endpoint.{HLS, RTSP}
alias Membrane.RTC.Engine.Message

alias Membrane.HTTPAdaptiveStream.Manifest.SegmentDuration

setup do
options = [
id: "test_rtc"
Expand All @@ -26,6 +28,22 @@ defmodule Membrane.RTC.RTSPEndpointTest do
@loopback_ip "127.0.0.1"
@fake_server_port 554

@hls_endpoint_id "hls-endpoint"
@fixtures_dir "./test/fixtures/"

@fixture_filename "video_baseline.h264"
@fixture_framerate {60, 1}
@fixture_hls_segments 4

# At the moment, this test might not work with H264 profiles with B-frames
# (any except :baseline and :constrained_baseline).
# For more info refer to the issue available here:
# https://membraneframework.atlassian.net/browse/MS-553
@fixture_profile "42c02a"
@fixture_sps <<103, 66, 192, 42, 217, 0, 120, 2, 39, 229, 154, 129, 1, 2, 160, 0, 0, 3, 0, 32,
0, 0, 15, 1, 227, 6, 73>>
@fixture_pps <<104, 203, 131, 203, 32>>

test "invalid URI", %{rtc_engine: rtc_engine} do
rtsp_endpoint = %RTSP{
rtc_engine: rtc_engine,
Expand Down Expand Up @@ -88,10 +106,15 @@ defmodule Membrane.RTC.RTSPEndpointTest do
test "RTSP signalling and disconnects (with a fake server)", %{rtc_engine: rtc_engine} do
self_pid = self()

server_pid =
spawn_link(fn ->
FakeRTSPserver.start(@loopback_ip, @fake_server_port, @rtp_port, self_pid)
end)
start_link_supervised!(
{FakeRTSPserver,
ip: @loopback_ip,
port: @fake_server_port,
client_port: @rtp_port,
parent_pid: self_pid,
stream_ctx: nil},
restart: :temporary
)

assert_receive(:fake_server_ready, 20_000)

Expand Down Expand Up @@ -126,7 +149,94 @@ defmodule Membrane.RTC.RTSPEndpointTest do
assert_receive(%Message.EndpointCrashed{endpoint_id: @rtsp_endpoint_id}, 20_000)

refute_received(_any)
end

@tag :tmp_dir
test "RTSP -> HLS conversion, single H264 input", %{rtc_engine: rtc_engine, tmp_dir: tmp_dir} do
self_pid = self()

start_link_supervised!(
{FakeRTSPserver,
ip: @loopback_ip,
port: @fake_server_port,
client_port: @rtp_port,
parent_pid: self_pid,
stream_ctx: %{
fixture_path: Path.join(@fixtures_dir, @fixture_filename),
framerate: @fixture_framerate,
profile: @fixture_profile,
sps: @fixture_sps,
pps: @fixture_pps
}},
restart: :temporary
)

assert_receive(:fake_server_ready, 20_000)

Process.exit(server_pid, :shutdown)
hls_endpoint = %HLS{
rtc_engine: rtc_engine,
owner: self(),
output_directory: tmp_dir,
synchronize_tracks?: false,
hls_config: %HLS.HLSConfig{
mode: :vod,
target_window_duration: :infinity,
segment_duration: %SegmentDuration{
min: Membrane.Time.seconds(2),
target: Membrane.Time.seconds(2)
}
}
}

:ok = Engine.add_endpoint(rtc_engine, hls_endpoint, endpoint_id: @hls_endpoint_id)

rtsp_endpoint = %RTSP{
rtc_engine: rtc_engine,
source_uri: "rtsp://#{@loopback_ip}:#{@fake_server_port}/stream",
rtp_port: @rtp_port,
pierce_nat: false
}

:ok = Engine.add_endpoint(rtc_engine, rtsp_endpoint, endpoint_id: @rtsp_endpoint_id)

assert_receive(
%Message.EndpointMessage{
endpoint_id: @rtsp_endpoint_id,
message: :rtsp_setup_complete
},
2_000
)

assert_receive(
%Message.EndpointMessage{
endpoint_id: @rtsp_endpoint_id,
message: :new_rtp_stream
},
2_000
)

assert_receive({:playlist_playable, :video, output_dir}, 10_000)

spawn(fn ->
check_presence_of_output_files(output_dir, @fixture_hls_segments, 1000, self_pid)
end)

assert_receive(:output_files_present, 20_000)

refute_received(_any)
end

defp check_presence_of_output_files(dir, n_segment_files, retry_after, notify_pid) do
segments = File.ls!(dir) |> Enum.filter(fn x -> x =~ ~r/^video_segment_[0-9]+_.*\.m4s$/ end)

if length(segments) == n_segment_files and
Enum.all?(segments, fn x ->
Path.join(dir, x) |> File.stat!() |> Map.get(:size) > 0
end) do
send(notify_pid, :output_files_present)
else
Process.sleep(retry_after)
check_presence_of_output_files(dir, n_segment_files, retry_after, notify_pid)
end
end
end
Loading

0 comments on commit f8fdfcb

Please sign in to comment.