Skip to content

Commit

Permalink
feat: add URI handling to connection and protocol management (#738)
Browse files Browse the repository at this point in the history
  • Loading branch information
halajohn authored Feb 22, 2025
1 parent 201077f commit 0d2736b
Show file tree
Hide file tree
Showing 14 changed files with 181 additions and 59 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test",
"args": [
"--gtest_filter=AudioFrameTest.FromJson"
"--gtest_filter=ExtensionTest.WrongEngineThenCorrectInMigration"
],
"cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/",
"env": {
Expand Down
3 changes: 0 additions & 3 deletions core/include_internal/ten_runtime/app/msg_interface/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ TEN_RUNTIME_PRIVATE_API bool ten_app_handle_in_msg(ten_app_t *self,
ten_shared_ptr_t *msg,
ten_error_t *err);

TEN_RUNTIME_PRIVATE_API ten_connection_t *ten_app_find_src_connection_for_msg(
ten_app_t *self, ten_shared_ptr_t *msg);

TEN_RUNTIME_PRIVATE_API void
ten_app_do_connection_migration_or_push_to_engine_queue(
ten_connection_t *connection, ten_engine_t *engine, ten_shared_ptr_t *msg);
Expand Down
3 changes: 3 additions & 0 deletions core/include_internal/ten_runtime/connection/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ typedef struct ten_connection_t {
ten_signature_t signature;
ten_sanitizer_thread_check_t thread_check;

ten_string_t uri;

// The main thread would update this variable. When the extension thread wants
// to send msgs, it would read this variable to determine if it can send the
// msgs or not. So we need to apply some synchronization method (atomic) on
Expand All @@ -160,6 +162,7 @@ typedef struct ten_connection_t {

bool duplicate;

// =-=-= 不用是 atomic?
ten_atomic_t attach_to; // TEN_CONNECTION_ATTACH_TO
union {
ten_app_t *app;
Expand Down
13 changes: 13 additions & 0 deletions core/include_internal/ten_runtime/engine/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
typedef struct ten_extension_context_t ten_extension_context_t;
typedef struct ten_app_t ten_app_t;
typedef struct ten_env_t ten_env_t;
typedef struct ten_connection_t ten_connection_t;

struct ten_engine_t {
ten_signature_t signature;
Expand Down Expand Up @@ -64,6 +65,9 @@ struct ten_engine_t {

ten_list_t timers;

// Connections that are not connected to the remote.
ten_list_t orphan_connections;

// @{
ten_hashtable_t remotes; // ten_remote_t
ten_list_t weak_remotes;
Expand Down Expand Up @@ -112,3 +116,12 @@ TEN_RUNTIME_PRIVATE_API bool ten_engine_is_ready_to_handle_msg(

TEN_RUNTIME_PRIVATE_API const char *ten_engine_get_id(ten_engine_t *self,
bool check_thread);

TEN_RUNTIME_PRIVATE_API void ten_engine_add_orphan_connection(
ten_engine_t *self, ten_connection_t *connection);

TEN_RUNTIME_PRIVATE_API void ten_engine_del_orphan_connection(
ten_engine_t *self, ten_connection_t *connection);

TEN_RUNTIME_PRIVATE_API ten_connection_t *ten_engine_find_orphan_connection(
ten_engine_t *self, const char *uri);
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@ TEN_RUNTIME_PRIVATE_API void ten_engine_on_remote_closed(ten_remote_t *remote,
TEN_RUNTIME_PRIVATE_API bool ten_engine_receive_msg_from_remote(
ten_remote_t *remote, ten_shared_ptr_t *msg, void *user_data);

TEN_RUNTIME_PRIVATE_API void ten_engine_link_connection_to_remote(
TEN_RUNTIME_PRIVATE_API void ten_engine_link_orphan_connection_to_remote(
ten_engine_t *self, ten_connection_t *connection, const char *uri);
2 changes: 1 addition & 1 deletion core/include_internal/ten_runtime/msg/cmd_base/cmd_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ typedef struct ten_cmd_base_t {
// the engine, and no cmds could be processed any further. So we don't need
// to use sharedptr to wrap the following variable, because when a command
// is being processed, the origin must be alive.
ten_connection_t *original_connection;
ten_connection_t *original_connection; // =-=-= 可以拿掉?

ten_env_transfer_msg_result_handler_func_t result_handler;
void *result_handler_data;
Expand Down
2 changes: 2 additions & 0 deletions core/include_internal/ten_runtime/protocol/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ TEN_RUNTIME_PRIVATE_API ten_string_t *ten_protocol_uri_to_transport_uri(
TEN_RUNTIME_PRIVATE_API void ten_protocol_set_uri(ten_protocol_t *self,
ten_string_t *uri);

TEN_RUNTIME_PRIVATE_API const char *ten_protocol_get_uri(ten_protocol_t *self);

TEN_RUNTIME_PRIVATE_API void ten_protocol_set_addon(
ten_protocol_t *self, ten_addon_host_t *addon_host);

Expand Down
28 changes: 6 additions & 22 deletions core/src/ten_runtime/app/msg_interface/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ static bool ten_app_handle_msg_default_handler(ten_app_t *self,
// client disconnects, the implementation protocol needs to be closed).
ten_connection_migration_state_reset_when_engine_not_found(connection);

// Since this is an incorrect command (sent to a non-existent engine), the
// migration was unsuccessful. Therefore, the connection's URI is reset so
// that the source URI of the next command can potentially become the URI
// of this connection.
ten_string_clear(&connection->uri);

ten_connection_send_msg(connection, resp);
} else {
// The 'msg' might be sent from extension A in engine 1 to extension B in
Expand Down Expand Up @@ -523,28 +529,6 @@ void ten_app_push_to_in_msgs_queue(ten_app_t *self, ten_shared_ptr_t *msg) {
ten_app_handle_in_msgs_async(self);
}

ten_connection_t *ten_app_find_src_connection_for_msg(ten_app_t *self,
ten_shared_ptr_t *msg) {
TEN_ASSERT(self && ten_app_check_integrity(self, false),
"Should not happen.");
TEN_ASSERT(msg && ten_msg_check_integrity(msg), "Invalid argument.");

const char *src_uri = ten_msg_get_src_app_uri(msg);
if (strlen(src_uri)) {
ten_list_foreach (&self->orphan_connections, iter) {
ten_connection_t *connection = ten_ptr_listnode_get(iter.node);
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true),
"Should not happen.");

if (ten_string_is_equal_c_str(&connection->protocol->uri, src_uri)) {
return connection;
}
}
}

return NULL;
}

void ten_app_create_cmd_result_and_dispatch(ten_app_t *self,
ten_shared_ptr_t *origin_cmd,
TEN_STATUS_CODE status_code,
Expand Down
16 changes: 13 additions & 3 deletions core/src/ten_runtime/connection/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "ten_utils/lib/atomic.h"
#include "ten_utils/lib/ref.h"
#include "ten_utils/lib/smart_ptr.h"
#include "ten_utils/lib/string.h"
#include "ten_utils/macro/check.h"
#include "ten_utils/macro/mark.h"
#include "ten_utils/sanitizer/thread_check.h"
Expand Down Expand Up @@ -80,6 +81,8 @@ void ten_connection_destroy(ten_connection_t *self) {

ten_signature_set(&self->signature, 0);

ten_string_deinit(&self->uri);

if (self->protocol) {
ten_ref_dec_ref(&self->protocol->ref);
}
Expand Down Expand Up @@ -179,6 +182,8 @@ ten_connection_t *ten_connection_create(ten_protocol_t *protocol) {

self->migration_state = TEN_CONNECTION_MIGRATION_STATE_INIT;

ten_string_init(&self->uri);

ten_atomic_store(&self->is_closing, 0);
self->is_closed = false;

Expand Down Expand Up @@ -354,7 +359,8 @@ void ten_connection_on_msgs(ten_connection_t *self, ten_list_t *msgs) {
// it in this case now.
const char *cmd_id = ten_cmd_base_get_cmd_id(msg);
TEN_ASSERT(cmd_id, "Should not happen.");
if (!strlen(cmd_id)) {

if (strlen(cmd_id) == 0) {
ten_connection_handle_command_from_external_client(self, msg);
}
} else {
Expand All @@ -367,6 +373,12 @@ void ten_connection_on_msgs(ten_connection_t *self, ten_list_t *msgs) {
}
}

// If this connection has not been assigned a URI yet, the source URI of the
// first received command will become the URI of this connection.
if (ten_string_is_empty(&self->uri)) {
ten_string_set_from_c_str(&self->uri, ten_msg_get_src_app_uri(msg));
}

// Send into the TEN runtime to be processed.
ten_connection_on_input(self, msg, &err);
}
Expand Down Expand Up @@ -410,8 +422,6 @@ void ten_connection_attach_to_remote(ten_connection_t *self,
ten_atomic_store(&self->attach_to, TEN_CONNECTION_ATTACH_TO_REMOTE);
self->attached_target.remote = remote;

ten_connection_set_on_closed(self, ten_remote_on_connection_closed, remote);

if (self->protocol) {
ten_protocol_set_uri(self->protocol, &remote->uri);
}
Expand Down
7 changes: 1 addition & 6 deletions core/src/ten_runtime/connection/migration.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,7 @@ void ten_connection_upgrade_migration_state_to_done(ten_connection_t *self,
self->attached_target.engine = engine;
ten_atomic_store(&self->attach_to, TEN_CONNECTION_ATTACH_TO_ENGINE);

// We have to set the 'on_closed' callback to destroy the connection, if the
// connection is being closed before the corresponding 'ten_remote_t' object
// is created. Ex: the connection is 'duplicated' in the 'start_graph'
// stage, refer to
// 'ten_engine_close_duplicated_remote_or_upgrade_it_to_normal()'.
ten_connection_set_on_closed(self, ten_engine_on_connection_closed, NULL);
ten_engine_add_orphan_connection(engine, self);
// @}
}

Expand Down
94 changes: 94 additions & 0 deletions core/src/ten_runtime/engine/engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "ten_utils/lib/string.h"
#include "ten_utils/lib/uuid.h"
#include "ten_utils/macro/check.h"
#include "ten_utils/macro/mark.h"
#include "ten_utils/sanitizer/thread_check.h"

bool ten_engine_check_integrity(ten_engine_t *self, bool check_thread) {
Expand Down Expand Up @@ -67,6 +68,9 @@ void ten_engine_destroy(ten_engine_t *self) {

ten_signature_set(&self->signature, 0);

TEN_ASSERT(ten_list_is_empty(&self->orphan_connections),
"Should not happen.");

ten_hashtable_deinit(&self->remotes);
ten_list_clear(&self->weak_remotes);

Expand Down Expand Up @@ -184,6 +188,8 @@ ten_engine_t *ten_engine_create(ten_app_t *app, ten_shared_ptr_t *cmd) {
self->belonging_thread_is_set = NULL;
self->is_ready_to_handle_msg = false;

ten_list_init(&self->orphan_connections);

ten_hashtable_init(&self->remotes,
offsetof(ten_remote_t, hh_in_remote_table));
ten_list_init(&self->weak_remotes);
Expand Down Expand Up @@ -255,3 +261,91 @@ const char *ten_engine_get_id(ten_engine_t *self, bool check_thread) {

return ten_string_get_raw_str(&self->graph_id);
}

void ten_engine_del_orphan_connection(ten_engine_t *self,
ten_connection_t *connection) {
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true),
"Should not happen.");

TEN_LOGD("[%s] Remove a orphan connection %p", ten_engine_get_id(self, true),
connection);

TEN_UNUSED bool rc =
ten_list_remove_ptr(&self->orphan_connections, connection);
TEN_ASSERT(rc, "Should not happen.");

connection->on_closed = NULL;
connection->on_closed_data = NULL;
}

static void ten_engine_on_orphan_connection_closed(
ten_connection_t *connection, TEN_UNUSED void *on_closed_data) {
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true),
"Should not happen.");

ten_engine_t *self = connection->attached_target.engine;
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");

TEN_LOGD("[%s] Orphan connection %p closed", ten_engine_get_id(self, true),
connection);

ten_engine_del_orphan_connection(self, connection);
ten_connection_destroy(connection);

// Check if the app is in the closing phase.
if (ten_engine_is_closing(self)) {
TEN_LOGD("[%s] Engine is closing, check to see if it could proceed.",
ten_engine_get_id(self, true));
ten_engine_on_close(self);
} else {
// If 'connection' is an orphan connection, it means the connection is not
// attached to an engine, and the TEN app should _not_ be closed due to an
// strange connection like this, otherwise, the TEN app will be very
// fragile, anyone could simply connect to the TEN app, and close the app
// through disconnection.
}
}

void ten_engine_add_orphan_connection(ten_engine_t *self,
ten_connection_t *connection) {
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true),
"Should not happen.");

TEN_LOGD("[%s] Add a orphan connection %p[%s] (total cnt %zu)",
ten_engine_get_id(self, true), connection,
ten_string_get_raw_str(&connection->uri),
ten_list_size(&self->orphan_connections));

ten_connection_set_on_closed(connection,
ten_engine_on_orphan_connection_closed, NULL);

// Do not set 'ten_connection_destroy' as the destroy function, because we
// might _move_ a connection out of 'orphan_connections' list when it is
// associated with an engine.
ten_list_push_ptr_back(&self->orphan_connections, connection, NULL);
}

ten_connection_t *ten_engine_find_orphan_connection(ten_engine_t *self,
const char *uri) {
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");

if (strlen(uri)) {
ten_list_foreach (&self->orphan_connections, iter) {
ten_connection_t *connection = ten_ptr_listnode_get(iter.node);
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true),
"Should not happen.");

if (ten_string_is_equal_c_str(&connection->uri, uri)) {
return connection;
}
}
}

return NULL;
}
31 changes: 22 additions & 9 deletions core/src/ten_runtime/engine/internal/remote_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,26 +223,32 @@ static ten_remote_t *ten_engine_find_remote(ten_engine_t *self,
return NULL;
}

void ten_engine_link_connection_to_remote(ten_engine_t *self,
ten_connection_t *connection,
const char *uri) {
void ten_engine_link_orphan_connection_to_remote(
ten_engine_t *self, ten_connection_t *orphan_connection, const char *uri) {
TEN_ASSERT(self, "Invalid argument.");
TEN_ASSERT(ten_engine_check_integrity(self, true),
"Invalid use of engine %p.", self);

TEN_ASSERT(connection, "Invalid argument.");
TEN_ASSERT(ten_connection_check_integrity(connection, true),
"Invalid use of engine %p.", connection);
TEN_ASSERT(orphan_connection, "Invalid argument.");
TEN_ASSERT(ten_connection_check_integrity(orphan_connection, true),
"Invalid use of engine %p.", orphan_connection);

TEN_ASSERT(uri, "Invalid argument.");

ten_remote_t *remote = ten_engine_find_remote(self, uri);
TEN_ASSERT(
!remote,
!ten_engine_find_remote(self, uri),
"The relationship of remote and connection should be 1-1 mapping.");

remote = ten_remote_create_for_engine(uri, self, connection);
ten_remote_t *remote =
ten_remote_create_for_engine(uri, self, orphan_connection);
ten_engine_add_remote(self, remote);

ten_engine_del_orphan_connection(self, orphan_connection);

// Since the connection is already connected to the remote, the remote also
// needs to be triggered to close when the connection is closed.
ten_connection_set_on_closed(orphan_connection,
ten_remote_on_connection_closed, remote);
}

static void ten_engine_on_remote_protocol_created(ten_env_t *ten_env,
Expand All @@ -261,6 +267,8 @@ static void ten_engine_on_remote_protocol_created(ten_env_t *ten_env,
ten_connection_t *connection = ten_connection_create(protocol);
TEN_ASSERT(connection, "Should not happen.");

ten_string_copy(&connection->uri, &protocol->uri);

// This is in the 'connect_to' stage, the 'connection' already attaches to the
// engine, no migration is needed.
ten_connection_set_migration_state(connection,
Expand All @@ -270,6 +278,11 @@ static void ten_engine_on_remote_protocol_created(ten_env_t *ten_env,
ten_string_get_raw_str(&protocol->uri), self, connection);
TEN_ASSERT(remote, "Should not happen.");

// Since the connection is already connected to the remote, the remote also
// needs to be triggered to close when the connection is closed.
ten_connection_set_on_closed(connection, ten_remote_on_connection_closed,
remote);

if (ctx->cb) {
ctx->cb(self, remote, ctx->user_data);
}
Expand Down
Loading

0 comments on commit 0d2736b

Please sign in to comment.