Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add URI handling to connection and protocol management #738

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test",
"args": [
"--gtest_filter=AudioFrameTest.FromJson"
"--gtest_filter=ExtensionTest.WrongEngineThenCorrectInMigration"
],
"cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/",
"env": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ TEN_RUNTIME_PRIVATE_API bool ten_app_handle_in_msg(ten_app_t *self,
ten_shared_ptr_t *msg,
ten_error_t *err);

TEN_RUNTIME_PRIVATE_API ten_connection_t *ten_app_find_src_connection_for_msg(
ten_app_t *self, ten_shared_ptr_t *msg);

TEN_RUNTIME_PRIVATE_API void
ten_app_do_connection_migration_or_push_to_engine_queue(
ten_connection_t *connection, ten_engine_t *engine, ten_shared_ptr_t *msg);
Expand Down
3 changes: 3 additions & 0 deletions core/include_internal/ten_runtime/connection/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ typedef struct ten_connection_t {
ten_signature_t signature;
ten_sanitizer_thread_check_t thread_check;

ten_string_t uri;

// The main thread would update this variable. When the extension thread wants
// to send msgs, it would read this variable to determine if it can send the
// msgs or not. So we need to apply some synchronization method (atomic) on
Expand All @@ -160,6 +162,7 @@ typedef struct ten_connection_t {

bool duplicate;

// =-=-= 不用是 atomic?
ten_atomic_t attach_to; // TEN_CONNECTION_ATTACH_TO
union {
ten_app_t *app;
Expand Down
13 changes: 13 additions & 0 deletions core/include_internal/ten_runtime/engine/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
typedef struct ten_extension_context_t ten_extension_context_t;
typedef struct ten_app_t ten_app_t;
typedef struct ten_env_t ten_env_t;
typedef struct ten_connection_t ten_connection_t;

struct ten_engine_t {
ten_signature_t signature;
Expand Down Expand Up @@ -64,6 +65,9 @@ struct ten_engine_t {

ten_list_t timers;

// Connections that are not connected to the remote.
ten_list_t orphan_connections;

// @{
ten_hashtable_t remotes; // ten_remote_t
ten_list_t weak_remotes;
Expand Down Expand Up @@ -112,3 +116,12 @@ TEN_RUNTIME_PRIVATE_API bool ten_engine_is_ready_to_handle_msg(

TEN_RUNTIME_PRIVATE_API const char *ten_engine_get_id(ten_engine_t *self,
bool check_thread);

TEN_RUNTIME_PRIVATE_API void ten_engine_add_orphan_connection(
ten_engine_t *self, ten_connection_t *connection);

TEN_RUNTIME_PRIVATE_API void ten_engine_del_orphan_connection(
ten_engine_t *self, ten_connection_t *connection);

TEN_RUNTIME_PRIVATE_API ten_connection_t *ten_engine_find_orphan_connection(
ten_engine_t *self, const char *uri);
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@ TEN_RUNTIME_PRIVATE_API void ten_engine_on_remote_closed(ten_remote_t *remote,
TEN_RUNTIME_PRIVATE_API bool ten_engine_receive_msg_from_remote(
ten_remote_t *remote, ten_shared_ptr_t *msg, void *user_data);

TEN_RUNTIME_PRIVATE_API void ten_engine_link_connection_to_remote(
TEN_RUNTIME_PRIVATE_API void ten_engine_link_orphan_connection_to_remote(
ten_engine_t *self, ten_connection_t *connection, const char *uri);
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ typedef struct ten_cmd_base_t {
// the engine, and no cmds could be processed any further. So we don't need
// to use sharedptr to wrap the following variable, because when a command
// is being processed, the origin must be alive.
ten_connection_t *original_connection;
ten_connection_t *original_connection; // =-=-= 可以拿掉?

ten_env_transfer_msg_result_handler_func_t result_handler;
void *result_handler_data;
Expand Down
2 changes: 2 additions & 0 deletions core/include_internal/ten_runtime/protocol/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ TEN_RUNTIME_PRIVATE_API ten_string_t *ten_protocol_uri_to_transport_uri(
TEN_RUNTIME_PRIVATE_API void ten_protocol_set_uri(ten_protocol_t *self,
ten_string_t *uri);

TEN_RUNTIME_PRIVATE_API const char *ten_protocol_get_uri(ten_protocol_t *self);

TEN_RUNTIME_PRIVATE_API void ten_protocol_set_addon(
ten_protocol_t *self, ten_addon_host_t *addon_host);

Expand Down
28 changes: 6 additions & 22 deletions core/src/ten_runtime/app/msg_interface/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ static bool ten_app_handle_msg_default_handler(ten_app_t *self,
// client disconnects, the implementation protocol needs to be closed).
ten_connection_migration_state_reset_when_engine_not_found(connection);

// Since this is an incorrect command (sent to a non-existent engine), the
// migration was unsuccessful. Therefore, the connection's URI is reset so
// that the source URI of the next command can potentially become the URI
// of this connection.
ten_string_clear(&connection->uri);

ten_connection_send_msg(connection, resp);
} else {
// The 'msg' might be sent from extension A in engine 1 to extension B in
Expand Down Expand Up @@ -523,28 +529,6 @@ void ten_app_push_to_in_msgs_queue(ten_app_t *self, ten_shared_ptr_t *msg) {
ten_app_handle_in_msgs_async(self);
}

ten_connection_t *ten_app_find_src_connection_for_msg(ten_app_t *self,
ten_shared_ptr_t *msg) {
TEN_ASSERT(self && ten_app_check_integrity(self, false),
"Should not happen.");
TEN_ASSERT(msg && ten_msg_check_integrity(msg), "Invalid argument.");

const char *src_uri = ten_msg_get_src_app_uri(msg);
if (strlen(src_uri)) {
ten_list_foreach (&self->orphan_connections, iter) {
ten_connection_t *connection = ten_ptr_listnode_get(iter.node);
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true),
"Should not happen.");

if (ten_string_is_equal_c_str(&connection->protocol->uri, src_uri)) {
return connection;
}
}
}

return NULL;
}

void ten_app_create_cmd_result_and_dispatch(ten_app_t *self,
ten_shared_ptr_t *origin_cmd,
TEN_STATUS_CODE status_code,
Expand Down
16 changes: 13 additions & 3 deletions core/src/ten_runtime/connection/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "ten_utils/lib/atomic.h"
#include "ten_utils/lib/ref.h"
#include "ten_utils/lib/smart_ptr.h"
#include "ten_utils/lib/string.h"
#include "ten_utils/macro/check.h"
#include "ten_utils/macro/mark.h"
#include "ten_utils/sanitizer/thread_check.h"
Expand Down Expand Up @@ -80,6 +81,8 @@ void ten_connection_destroy(ten_connection_t *self) {

ten_signature_set(&self->signature, 0);

ten_string_deinit(&self->uri);

if (self->protocol) {
ten_ref_dec_ref(&self->protocol->ref);
}
Expand Down Expand Up @@ -179,6 +182,8 @@ ten_connection_t *ten_connection_create(ten_protocol_t *protocol) {

self->migration_state = TEN_CONNECTION_MIGRATION_STATE_INIT;

ten_string_init(&self->uri);

ten_atomic_store(&self->is_closing, 0);
self->is_closed = false;

Expand Down Expand Up @@ -354,7 +359,8 @@ void ten_connection_on_msgs(ten_connection_t *self, ten_list_t *msgs) {
// it in this case now.
const char *cmd_id = ten_cmd_base_get_cmd_id(msg);
TEN_ASSERT(cmd_id, "Should not happen.");
if (!strlen(cmd_id)) {

if (strlen(cmd_id) == 0) {
ten_connection_handle_command_from_external_client(self, msg);
}
} else {
Expand All @@ -367,6 +373,12 @@ void ten_connection_on_msgs(ten_connection_t *self, ten_list_t *msgs) {
}
}

// If this connection has not been assigned a URI yet, the source URI of the
// first received command will become the URI of this connection.
if (ten_string_is_empty(&self->uri)) {
ten_string_set_from_c_str(&self->uri, ten_msg_get_src_app_uri(msg));
}

// Send into the TEN runtime to be processed.
ten_connection_on_input(self, msg, &err);
}
Expand Down Expand Up @@ -410,8 +422,6 @@ void ten_connection_attach_to_remote(ten_connection_t *self,
ten_atomic_store(&self->attach_to, TEN_CONNECTION_ATTACH_TO_REMOTE);
self->attached_target.remote = remote;

ten_connection_set_on_closed(self, ten_remote_on_connection_closed, remote);

if (self->protocol) {
ten_protocol_set_uri(self->protocol, &remote->uri);
}
Expand Down
7 changes: 1 addition & 6 deletions core/src/ten_runtime/connection/migration.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,7 @@ void ten_connection_upgrade_migration_state_to_done(ten_connection_t *self,
self->attached_target.engine = engine;
ten_atomic_store(&self->attach_to, TEN_CONNECTION_ATTACH_TO_ENGINE);

// We have to set the 'on_closed' callback to destroy the connection, if the
// connection is being closed before the corresponding 'ten_remote_t' object
// is created. Ex: the connection is 'duplicated' in the 'start_graph'
// stage, refer to
// 'ten_engine_close_duplicated_remote_or_upgrade_it_to_normal()'.
ten_connection_set_on_closed(self, ten_engine_on_connection_closed, NULL);
ten_engine_add_orphan_connection(engine, self);
// @}
}

Expand Down
94 changes: 94 additions & 0 deletions core/src/ten_runtime/engine/engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "ten_utils/lib/string.h"
#include "ten_utils/lib/uuid.h"
#include "ten_utils/macro/check.h"
#include "ten_utils/macro/mark.h"
#include "ten_utils/sanitizer/thread_check.h"

bool ten_engine_check_integrity(ten_engine_t *self, bool check_thread) {
Expand Down Expand Up @@ -67,6 +68,9 @@ void ten_engine_destroy(ten_engine_t *self) {

ten_signature_set(&self->signature, 0);

TEN_ASSERT(ten_list_is_empty(&self->orphan_connections),
"Should not happen.");

ten_hashtable_deinit(&self->remotes);
ten_list_clear(&self->weak_remotes);

Expand Down Expand Up @@ -184,6 +188,8 @@ ten_engine_t *ten_engine_create(ten_app_t *app, ten_shared_ptr_t *cmd) {
self->belonging_thread_is_set = NULL;
self->is_ready_to_handle_msg = false;

ten_list_init(&self->orphan_connections);

ten_hashtable_init(&self->remotes,
offsetof(ten_remote_t, hh_in_remote_table));
ten_list_init(&self->weak_remotes);
Expand Down Expand Up @@ -255,3 +261,91 @@ const char *ten_engine_get_id(ten_engine_t *self, bool check_thread) {

return ten_string_get_raw_str(&self->graph_id);
}

void ten_engine_del_orphan_connection(ten_engine_t *self,
ten_connection_t *connection) {
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true),
"Should not happen.");

TEN_LOGD("[%s] Remove a orphan connection %p", ten_engine_get_id(self, true),
connection);

TEN_UNUSED bool rc =
ten_list_remove_ptr(&self->orphan_connections, connection);
TEN_ASSERT(rc, "Should not happen.");

connection->on_closed = NULL;
connection->on_closed_data = NULL;
}

static void ten_engine_on_orphan_connection_closed(
ten_connection_t *connection, TEN_UNUSED void *on_closed_data) {
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true),
"Should not happen.");

ten_engine_t *self = connection->attached_target.engine;
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");

TEN_LOGD("[%s] Orphan connection %p closed", ten_engine_get_id(self, true),
connection);

ten_engine_del_orphan_connection(self, connection);
ten_connection_destroy(connection);

// Check if the app is in the closing phase.
if (ten_engine_is_closing(self)) {
TEN_LOGD("[%s] Engine is closing, check to see if it could proceed.",
ten_engine_get_id(self, true));
ten_engine_on_close(self);
} else {
// If 'connection' is an orphan connection, it means the connection is not
// attached to an engine, and the TEN app should _not_ be closed due to an
// strange connection like this, otherwise, the TEN app will be very
// fragile, anyone could simply connect to the TEN app, and close the app
// through disconnection.
}
}

void ten_engine_add_orphan_connection(ten_engine_t *self,
ten_connection_t *connection) {
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true),
"Should not happen.");

TEN_LOGD("[%s] Add a orphan connection %p[%s] (total cnt %zu)",
ten_engine_get_id(self, true), connection,
ten_string_get_raw_str(&connection->uri),
ten_list_size(&self->orphan_connections));

ten_connection_set_on_closed(connection,
ten_engine_on_orphan_connection_closed, NULL);

// Do not set 'ten_connection_destroy' as the destroy function, because we
// might _move_ a connection out of 'orphan_connections' list when it is
// associated with an engine.
ten_list_push_ptr_back(&self->orphan_connections, connection, NULL);
}

ten_connection_t *ten_engine_find_orphan_connection(ten_engine_t *self,
const char *uri) {
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");

if (strlen(uri)) {
ten_list_foreach (&self->orphan_connections, iter) {
ten_connection_t *connection = ten_ptr_listnode_get(iter.node);
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true),
"Should not happen.");

if (ten_string_is_equal_c_str(&connection->uri, uri)) {
return connection;
}
}
}

return NULL;
}
31 changes: 22 additions & 9 deletions core/src/ten_runtime/engine/internal/remote_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,26 +223,32 @@ static ten_remote_t *ten_engine_find_remote(ten_engine_t *self,
return NULL;
}

void ten_engine_link_connection_to_remote(ten_engine_t *self,
ten_connection_t *connection,
const char *uri) {
void ten_engine_link_orphan_connection_to_remote(
ten_engine_t *self, ten_connection_t *orphan_connection, const char *uri) {
TEN_ASSERT(self, "Invalid argument.");
TEN_ASSERT(ten_engine_check_integrity(self, true),
"Invalid use of engine %p.", self);

TEN_ASSERT(connection, "Invalid argument.");
TEN_ASSERT(ten_connection_check_integrity(connection, true),
"Invalid use of engine %p.", connection);
TEN_ASSERT(orphan_connection, "Invalid argument.");
TEN_ASSERT(ten_connection_check_integrity(orphan_connection, true),
"Invalid use of engine %p.", orphan_connection);

TEN_ASSERT(uri, "Invalid argument.");

ten_remote_t *remote = ten_engine_find_remote(self, uri);
TEN_ASSERT(
!remote,
!ten_engine_find_remote(self, uri),
"The relationship of remote and connection should be 1-1 mapping.");

remote = ten_remote_create_for_engine(uri, self, connection);
ten_remote_t *remote =
ten_remote_create_for_engine(uri, self, orphan_connection);
ten_engine_add_remote(self, remote);

ten_engine_del_orphan_connection(self, orphan_connection);

// Since the connection is already connected to the remote, the remote also
// needs to be triggered to close when the connection is closed.
ten_connection_set_on_closed(orphan_connection,
ten_remote_on_connection_closed, remote);
}

static void ten_engine_on_remote_protocol_created(ten_env_t *ten_env,
Expand All @@ -261,6 +267,8 @@ static void ten_engine_on_remote_protocol_created(ten_env_t *ten_env,
ten_connection_t *connection = ten_connection_create(protocol);
TEN_ASSERT(connection, "Should not happen.");

ten_string_copy(&connection->uri, &protocol->uri);

// This is in the 'connect_to' stage, the 'connection' already attaches to the
// engine, no migration is needed.
ten_connection_set_migration_state(connection,
Expand All @@ -270,6 +278,11 @@ static void ten_engine_on_remote_protocol_created(ten_env_t *ten_env,
ten_string_get_raw_str(&protocol->uri), self, connection);
TEN_ASSERT(remote, "Should not happen.");

// Since the connection is already connected to the remote, the remote also
// needs to be triggered to close when the connection is closed.
ten_connection_set_on_closed(connection, ten_remote_on_connection_closed,
remote);

if (ctx->cb) {
ctx->cb(self, remote, ctx->user_data);
}
Expand Down
Loading