Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support OPUS #24

Merged
merged 42 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b490ec9
Support OPUS
FelonEkonom Dec 12, 2024
32692e7
Fix typo
FelonEkonom Dec 12, 2024
117ba41
Rename variable
FelonEkonom Dec 12, 2024
22be939
Opus test
FelonEkonom Dec 12, 2024
8171bc2
Fix ambiugous enum
FelonEkonom Dec 12, 2024
48561d8
Mix format
FelonEkonom Dec 12, 2024
2ef5e2f
Fix test name
FelonEkonom Dec 12, 2024
9fbfb4c
Fix test name once again
FelonEkonom Dec 12, 2024
b1c0359
Set tests async false
FelonEkonom Dec 12, 2024
40c00c8
Maybr fix test
FelonEkonom Dec 12, 2024
ada11a6
Expect pad options if codec is opus
FelonEkonom Dec 12, 2024
ab46734
Fix typo
FelonEkonom Dec 12, 2024
67b969b
Remove test warn
FelonEkonom Dec 12, 2024
60f8900
Add tmp inspect
FelonEkonom Dec 12, 2024
9a55bd9
pass new fields in pad options in test
FelonEkonom Dec 12, 2024
a54e060
Rename pad option
FelonEkonom Dec 13, 2024
60c680b
Comment out opus test
FelonEkonom Dec 13, 2024
c8ea493
Fix options type
FelonEkonom Dec 13, 2024
60c3a37
Maybe fix test
FelonEkonom Dec 13, 2024
4ece42a
Fix typo
FelonEkonom Dec 13, 2024
2d5ac3f
Add debug logs
FelonEkonom Dec 13, 2024
c4dd318
Add debug log in NIF
FelonEkonom Dec 13, 2024
8b8c77e
Fix bugs in handling opus in agora sink
FelonEkonom Jan 10, 2025
bd25ab8
remove dump
FelonEkonom Jan 17, 2025
4c1d370
Cleanup test
FelonEkonom Jan 17, 2025
ae9461c
Add pad option description
FelonEkonom Jan 17, 2025
373739f
Fix credo
FelonEkonom Jan 17, 2025
3a228bd
Add compilation warning
FelonEkonom Jan 17, 2025
d0d2896
Make tolerance wider
FelonEkonom Jan 17, 2025
6126357
Fix error log
FelonEkonom Jan 17, 2025
94377d5
Fix opus fixtures
FelonEkonom Jan 17, 2025
fd0a686
Fix tests
FelonEkonom Jan 21, 2025
9faf7cf
mix format
FelonEkonom Jan 21, 2025
300951f
Fix credo
FelonEkonom Jan 21, 2025
eb8bb15
Apply comments from CR
FelonEkonom Jan 31, 2025
5e4f34d
Fix typo
FelonEkonom Jan 31, 2025
4f9d08a
Handle case when opus buffers dont have duration
FelonEkonom Feb 3, 2025
cd4b061
Fix typo
FelonEkonom Feb 4, 2025
e7143bc
Merge remote-tracking branch 'origin/master' into support-opus
FelonEkonom Feb 4, 2025
08024b8
Fix bug
FelonEkonom Feb 4, 2025
df7ba2b
Remove commented code
FelonEkonom Feb 5, 2025
53cad24
Adjust to alias
FelonEkonom Feb 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions c_src/membrane_agora_plugin/sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId,
scfg.enableVideo = true;
scfg.useStringUid = false;
if (state->service->initialize(scfg) != agora::ERR_OK) {
AG_LOG(ERROR, "Failed to initialize service");
AG_LOG(ERROR, "Failed to initialize sink service");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
AG_LOG(ERROR, "Failed to initialize sink service");
AG_LOG(ERROR, "Failed to initialize Agora service in sink");

unifex_release_state(env, state);
return create_result_error(env, "Failed to initialize service");
}
Expand Down Expand Up @@ -151,12 +151,18 @@ UNIFEX_TERM update_audio_stream_format(UnifexEnv *env, int sampleRate,
}

UNIFEX_TERM write_audio_data(UnifexEnv *env, UnifexPayload *payload,
SinkState *state) {
CodecAudio codec, SinkState *state) {
agora::rtc::EncodedAudioFrameInfo audioFrameInfo;
audioFrameInfo.sampleRateHz = state->sampleRate;
audioFrameInfo.numberOfChannels = state->numberOfChannels;
audioFrameInfo.samplesPerChannel = state->samplesPerChannelPerFrame;
audioFrameInfo.codec = agora::rtc::AUDIO_CODEC_TYPE::AUDIO_CODEC_AACLC;

if (codec == CODEC_AUDIO_AAC) {
audioFrameInfo.codec = agora::rtc::AUDIO_CODEC_TYPE::AUDIO_CODEC_AACLC;
} else if (codec == CODEC_AUDIO_OPUS) {
audioFrameInfo.codec = agora::rtc::AUDIO_CODEC_TYPE::AUDIO_CODEC_OPUS;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] I would warn if other codec was passed here. I know that now the only options for CodecAudio are CODEC_AUDIO_AAC and CODEC_AUDIO_OPUS, but it might cause trouble in the future if we were to add some further codecs.


if (state->audioEncodedFrameSender->sendEncodedAudioFrame(
reinterpret_cast<uint8_t *>(payload->data), payload->size,
audioFrameInfo) != true) {
Expand Down Expand Up @@ -186,13 +192,13 @@ void handle_destroy_state(UnifexEnv *env, SinkState *state) {
AG_LOG(ERROR, "Failed to disconnect from Agora channel!");
return;
}
AG_LOG(INFO, "Disconnected from Agora channel successfully");
AG_LOG(INFO, "[Sink] Disconnected from Agora channel successfully");
state->connection = NULL;
}

if (state->service) {
state->service->release();
AG_LOG(INFO, "Agora service released successfully");
AG_LOG(INFO, "[Sink] Agora service released successfully");
state->service = NULL;
}
}
4 changes: 3 additions & 1 deletion c_src/membrane_agora_plugin/sink.spec.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module(Membrane.Agora.Sink.Native)

state_type("SinkState")

type codec_audio :: :aac | :opus

spec(
create(
app_id :: string,
Expand All @@ -22,7 +24,7 @@ spec(
)

spec(
write_audio_data(payload, state) ::
write_audio_data(payload, codec :: codec_audio, state) ::
(:ok :: label) | {:error :: label, reason :: atom}
)

Expand Down
46 changes: 29 additions & 17 deletions c_src/membrane_agora_plugin/source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId,
scfg.enableVideo = true;
scfg.useStringUid = false;
if (state->service->initialize(scfg) != agora::ERR_OK) {
AG_LOG(ERROR, "Failed to initialize service");
AG_LOG(ERROR, "Failed to initialize source service");
state->service = NULL;
unifex_release_state(env, state);
return create_result_error(env, "Failed to initialize service");
}
Expand Down Expand Up @@ -77,33 +78,44 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId,
}

void handle_destroy_state(UnifexEnv *env, SourceState *state) {
state->connection->unregisterObserver(state->connObserver.get());
state->connObserver.reset();

state->connection->getLocalUser()->unregisterLocalUserObserver(
state->localUserObserver.get());
state->connection->getLocalUser()->unregisterVideoEncodedFrameObserver(
state->videoEncodedFrameObserver.get());
state->connection->getLocalUser()->unregisterAudioFrameObserver(
state->audioFrameObserver.get());

state->localUserObserver.reset();
state->videoEncodedFrameObserver.reset();
state->audioFrameObserver.reset();

UNUSED(env);
if (state->connection) {
state->connection->unregisterObserver(state->connObserver.get());
state->connObserver.reset();
}

if (state->connection && state->localUserObserver)
state->connection->getLocalUser()->unregisterLocalUserObserver(
state->localUserObserver.get());

if (state->connection && state->videoEncodedFrameObserver)
state->connection->getLocalUser()->unregisterVideoEncodedFrameObserver(
state->videoEncodedFrameObserver.get());
if (state->connection && state->audioFrameObserver)
state->connection->getLocalUser()->unregisterAudioFrameObserver(
state->audioFrameObserver.get());

if (state->localUserObserver)
state->localUserObserver.reset();

if (state->videoEncodedFrameObserver)
state->videoEncodedFrameObserver.reset();

if (state->audioFrameObserver)
state->audioFrameObserver.reset();

if (state->connection) {
if (state->connection->disconnect()) {
AG_LOG(ERROR, "Failed to disconnect from Agora channel!");
return;
}
AG_LOG(INFO, "Disconnected from Agora channel successfully");
AG_LOG(INFO, "[Source] Disconnected from Agora channel successfully");
state->connection = NULL;
}

if (state->service) {
state->service->release();
AG_LOG(INFO, "Agora service released successfully");
AG_LOG(INFO, "[Source] Agora service released successfully");
state->service = NULL;
}
}
74 changes: 63 additions & 11 deletions lib/agora/agora_sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Membrane.Agora.Sink do

require Membrane.Pad, as: Pad

alias Membrane.{AAC, Opus}
alias Membrane.Agora.Sink.Native

def_input_pad :video,
Expand All @@ -16,8 +17,22 @@ defmodule Membrane.Agora.Sink do

def_input_pad :audio,
availability: :on_request,
accepted_format: Membrane.AAC,
flow_control: :auto
accepted_format: any_of(Membrane.AAC, Membrane.Opus),
flow_control: :auto,
options: [
sample_rate: [
spec: pos_integer(),
default: 48_000,
description: """
Sample rate of the audio stream going through :audio pad.

Used only if the audio codec is `Membrane.Opus`. If the audio codec is
`Membrane.AAC`, sample rate value will be passed in the stream format.

Defaults to 48 000.
"""
]
]

def_options app_id: [
spec: String.t(),
Expand Down Expand Up @@ -54,7 +69,8 @@ defmodule Membrane.Agora.Sink do
token: opts.token,
channel_name: opts.channel_name,
user_id: opts.user_id,
native_state: nil
native_state: nil,
last_frame_duration: nil
}

{[], state}
Expand All @@ -69,8 +85,8 @@ defmodule Membrane.Agora.Sink do
_e in UndefinedFunctionError ->
reraise(
"""
Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")}
Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH: \
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")} \
""",
__STACKTRACE__
)
Expand All @@ -96,18 +112,24 @@ defmodule Membrane.Agora.Sink do
end

@impl true
def handle_stream_format(Pad.ref(:audio, _id), stream_format, _ctx, state) do
def handle_stream_format(Pad.ref(:audio, _id), %Membrane.AAC{} = aac, _ctx, state) do
{:ok, native_state} =
Native.update_audio_stream_format(
stream_format.sample_rate,
stream_format.channels,
stream_format.samples_per_frame,
aac.sample_rate,
aac.channels,
aac.samples_per_frame,
state.native_state
)

{[], %{state | native_state: native_state}}
end

@impl true
def handle_stream_format(Pad.ref(:audio, _id), %Opus{}, _ctx, state) do
# audio stream format will be updated in handle_buffer/4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] this comment is quite ambiguos for me

{[], state}
end

@impl true
def handle_buffer(Pad.ref(:video, _id), buffer, _ctx, state) do
:ok =
Expand All @@ -121,8 +143,38 @@ defmodule Membrane.Agora.Sink do
end

@impl true
def handle_buffer(Pad.ref(:audio, _id), buffer, _ctx, state) do
:ok = Native.write_audio_data(buffer.payload, state.native_state)
def handle_buffer(Pad.ref(:audio, _id) = pad, buffer, ctx, state) do
stream_format =
case ctx.pads[pad].stream_format do
%Opus{} -> :opus
%AAC{} -> :aac
end

state =
if stream_format == :opus and buffer.metadata.duration != state.last_frame_duration do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadata.duration isn't available outside of our pipeline, we add it with DurationAddingFilter

update_frame_duration(buffer.metadata.duration, pad, ctx, state)
else
state
end

:ok = Native.write_audio_data(buffer.payload, stream_format, state.native_state)
{[], state}
end

defp update_frame_duration(frame_duration, pad, ctx, state) do
pad_data = ctx.pads[pad]

sample_rate = pad_data.options.sample_rate
samples_per_frame = (frame_duration * sample_rate) |> div(1000)

{:ok, native_state} =
Native.update_audio_stream_format(
sample_rate,
pad_data.stream_format.channels,
samples_per_frame,
state.native_state
)

%{state | native_state: native_state, last_frame_duration: frame_duration}
end
end
9 changes: 8 additions & 1 deletion lib/agora/agora_sink_native.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
defmodule Membrane.Agora.Sink.Native do
@moduledoc false

if match?(%{os: "linux", architecture: "x86_64"}, Bundlex.get_target()) do
target = Bundlex.get_target()

if match?(%{os: "linux", architecture: "x86_64"}, target) do
use Unifex.Loader
else
IO.warn("""
Agora SDK used by #{inspect(__MODULE__)} works only on linux with architecture x86_64, while \
you are now on #{inspect(target.os)} with architecture #{inspect(target.architecture)}.
""")
end
end
4 changes: 2 additions & 2 deletions lib/agora/agora_source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ defmodule Membrane.Agora.Source do
_e in UndefinedFunctionError ->
reraise(
"""
Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")}
Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH: \
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")} \
""",
__STACKTRACE__
)
Expand Down
9 changes: 8 additions & 1 deletion lib/agora/agora_source_native.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
defmodule Membrane.Agora.Source.Native do
@moduledoc false

if match?(%{os: "linux", architecture: "x86_64"}, Bundlex.get_target()) do
target = Bundlex.get_target()

if match?(%{os: "linux", architecture: "x86_64"}, target) do
use Unifex.Loader
else
IO.warn("""
Agora SDK used by #{inspect(__MODULE__)} works only on linux with architecture x86_64, while \
you are now on #{inspect(target.os)} with architecture #{inspect(target.architecture)}.
""")
end
end
3 changes: 3 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ defmodule Membrane.Agora.Mixfile do
{:membrane_core, "~> 1.0"},
{:membrane_h264_format, "~> 0.6.1"},
{:membrane_aac_format, "~> 0.8.0"},
{:membrane_opus_format, "~> 0.3.0"},
{:membrane_raw_audio_format, "~> 0.12.0"},
{:membrane_ogg_plugin, "~> 0.5.0"},
{:unifex, "~> 1.1"},
{:membrane_file_plugin, "~> 0.16.0", only: :test},
{:membrane_h26x_plugin, "~> 0.10.0", only: :test},
{:membrane_aac_plugin, "~> 0.18.1", only: :test},
{:membrane_opus_plugin, "~> 0.20.4", only: :test},
{:membrane_realtimer_plugin, "~> 0.9.0", only: :test},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:dialyxir, ">= 0.0.0", only: :dev, runtime: false},
Expand Down
Loading