Skip to content

Commit

Permalink
refactor: remove original_connection field and related logic from com…
Browse files Browse the repository at this point in the history
…mand handling
  • Loading branch information
halajohn committed Feb 22, 2025
1 parent 0d2736b commit a039570
Show file tree
Hide file tree
Showing 14 changed files with 42 additions and 140 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test",
"args": [
"--gtest_filter=ExtensionTest.WrongEngineThenCorrectInMigration"
"--gtest_filter=ExtensionTest.GraphMultiplePolygon"
],
"cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/",
"env": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ TEN_RUNTIME_PRIVATE_API bool ten_engine_check_remote_is_duplicated(
TEN_RUNTIME_PRIVATE_API bool ten_engine_check_remote_is_weak(
ten_engine_t *self, ten_remote_t *remote);

TEN_RUNTIME_PRIVATE_API ten_remote_t *ten_engine_find_weak_remote(
ten_engine_t *self, const char *uri);

TEN_RUNTIME_PRIVATE_API void ten_engine_on_remote_closed(ten_remote_t *remote,
void *on_closed_data);

Expand Down
17 changes: 0 additions & 17 deletions core/include_internal/ten_runtime/msg/cmd_base/cmd_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,6 @@ typedef struct ten_cmd_base_t {
ten_value_t cmd_id; // string. This is used in TEN runtime internally.
ten_value_t seq_id; // string. This is used in TEN client.

// The origin where the command is originated.
//
// This is kind of a cache to enable us not to loop all the remotes to find
// the correct one.
//
// If any remote of an engine is closed, then it will trigger the closing of
// the engine, and no cmds could be processed any further. So we don't need
// to use sharedptr to wrap the following variable, because when a command
// is being processed, the origin must be alive.
ten_connection_t *original_connection; // =-=-= 可以拿掉?

ten_env_transfer_msg_result_handler_func_t result_handler;
void *result_handler_data;
} ten_cmd_base_t;
Expand Down Expand Up @@ -102,12 +91,6 @@ TEN_RUNTIME_PRIVATE_API void ten_raw_cmd_base_set_seq_id(ten_cmd_base_t *self,
TEN_RUNTIME_PRIVATE_API bool ten_cmd_base_cmd_id_is_empty(
ten_shared_ptr_t *self);

TEN_RUNTIME_PRIVATE_API ten_connection_t *ten_cmd_base_get_original_connection(
ten_shared_ptr_t *self);

TEN_RUNTIME_PRIVATE_API void ten_cmd_base_set_original_connection(
ten_shared_ptr_t *self, ten_connection_t *connection);

TEN_RUNTIME_PRIVATE_API const char *ten_cmd_base_get_parent_cmd_id(
ten_shared_ptr_t *self);

Expand Down
2 changes: 0 additions & 2 deletions core/include_internal/ten_runtime/msg/cmd_base/field/field.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ typedef enum TEN_CMD_BASE_FIELD {
TEN_CMD_BASE_FIELD_CMD_ID,
TEN_CMD_BASE_FIELD_SEQ_ID,

TEN_CMD_BASE_FIELD_ORIGINAL_CONNECTION,

TEN_CMD_BASE_FIELD_RESPONSE_HANDLER,
TEN_CMD_BASE_FIELD_RESPONSE_HANDLER_DATA,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "include_internal/ten_runtime/common/constant_str.h"
#include "include_internal/ten_runtime/msg/cmd_base/field/cmd_id.h"
#include "include_internal/ten_runtime/msg/cmd_base/field/field.h"
#include "include_internal/ten_runtime/msg/cmd_base/field/original_connection.h"
#include "include_internal/ten_runtime/msg/cmd_base/field/result_handler.h"
#include "include_internal/ten_runtime/msg/cmd_base/field/result_handler_data.h"
#include "include_internal/ten_runtime/msg/cmd_base/field/seq_id.h"
Expand Down Expand Up @@ -48,14 +47,6 @@ static const ten_msg_field_info_t ten_cmd_base_fields_info[] = {
.copy_field = ten_cmd_base_copy_seq_id,
.process_field = ten_cmd_base_process_seq_id,
},
[TEN_CMD_BASE_FIELD_ORIGINAL_CONNECTION] =
{
.field_name = NULL,
.field_id =
TEN_MSG_FIELD_LAST + TEN_CMD_BASE_FIELD_ORIGINAL_CONNECTION,
.copy_field = ten_cmd_base_copy_original_connection,
.process_field = NULL,
},
[TEN_CMD_BASE_FIELD_RESPONSE_HANDLER] =
{
.field_name = NULL,
Expand Down

This file was deleted.

3 changes: 1 addition & 2 deletions core/src/ten_runtime/app/msg_interface/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,7 @@ static void ten_app_handle_in_msgs_sync(ten_app_t *self) {
// now.
//
// - Some cmds are sent from the extensions in the engine, and the receiver
// is the app, ex: the 'close_app' cmd. The value of the cmd's
// 'original_connection' field is NULL in this case.
// is the app, ex: the 'close_app' cmd.
//
// - Some cmds are sent from one engine, and the receiver is another engine
// in the app. The value of the cmd's 'origin_connection' field might or
Expand Down
6 changes: 1 addition & 5 deletions core/src/ten_runtime/connection/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,6 @@ void ten_connection_on_msgs(ten_connection_t *self, ten_list_t *msgs) {
ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node);

if (ten_msg_is_cmd_and_result(msg)) {
// For a command message, remember which connection this command is coming
// from.
ten_cmd_base_set_original_connection(msg, self);

// If this command is coming from outside of the TEN world (i.e.,
// clients), the command ID would be empty, so we generate a new one for
// it in this case now.
Expand Down Expand Up @@ -470,7 +466,7 @@ ten_runloop_t *ten_connection_get_attached_runloop(ten_connection_t *self) {
// function has been called which means the migration is completed). Refer to
// 'ten_protocol_asynced_on_input_async()'.

switch (ten_atomic_load(&self->attach_to)) {
switch (ten_connection_attach_to(self)) {
case TEN_CONNECTION_ATTACH_TO_REMOTE:
return ten_remote_get_attached_runloop(self->attached_target.remote);
case TEN_CONNECTION_ATTACH_TO_ENGINE:
Expand Down
3 changes: 0 additions & 3 deletions core/src/ten_runtime/engine/internal/migration.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ void ten_engine_on_connection_cleaned_async(ten_engine_t *self,
"Access across threads.");
TEN_ASSERT(cmd && ten_msg_check_integrity(cmd), "Invalid argument.");

// TODO(Liu): The 'connection' should be the 'original_connection' of the
// 'cmd', so we can use the 'cmd' as the parameter directly. But we have to
// refine the 'ten_app_on_msg()' first.
ten_engine_migration_user_data_t *user_data =
ten_engine_migration_user_data_create(connection, cmd);

Expand Down
20 changes: 20 additions & 0 deletions core/src/ten_runtime/engine/internal/remote_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,25 @@ static bool ten_engine_del_weak_remote(ten_engine_t *self,
return success;
}

ten_remote_t *ten_engine_find_weak_remote(ten_engine_t *self, const char *uri) {
TEN_ASSERT(self, "Invalid argument.");
TEN_ASSERT(ten_engine_check_integrity(self, true),
"Invalid use of engine %p.", self);

ten_list_foreach (&self->weak_remotes, iter) {
ten_remote_t *remote = ten_ptr_listnode_get(iter.node);
TEN_ASSERT(remote, "Invalid argument.");
TEN_ASSERT(ten_remote_check_integrity(remote, true),
"Invalid use of remote %p.", remote);

if (ten_string_is_equal_c_str(&remote->uri, uri)) {
return remote;
}
}

return NULL;
}

static size_t ten_engine_weak_remotes_cnt_in_specified_uri(ten_engine_t *self,
const char *uri) {
TEN_ASSERT(self, "Invalid argument.");
Expand Down Expand Up @@ -389,6 +408,7 @@ static void ten_engine_connect_to_remote_after_remote_is_created(
ten_msg_check_integrity(start_graph_cmd_for_the_remote),
"Invalid argument.");

// A simple sanity check:
// Before starting to connect to more apps in the whole start_graph process,
// `original_start_graph_cmd_of_enabling_engine` must be set. Otherwise,
// after the entire process is completed, there will be no way to determine
Expand Down
39 changes: 15 additions & 24 deletions core/src/ten_runtime/engine/msg_interface/cmd_result.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,16 @@ static bool ten_engine_close_duplicated_remote_or_upgrade_it_to_normal(
ten_cmd_base_check_integrity(cmd_result),
"Should not happen.");

ten_connection_t *connection =
ten_cmd_base_get_original_connection(cmd_result);
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true) &&
ten_connection_attach_to(connection) ==
TEN_CONNECTION_ATTACH_TO_REMOTE,
"Should not happen.");

ten_remote_t *remote = connection->attached_target.remote;
TEN_ASSERT(remote, "Invalid argument.");
TEN_ASSERT(ten_remote_check_integrity(remote, true),
"Invalid use of remote %p.", remote);
ten_remote_t *weak_remote =
ten_engine_find_weak_remote(self, ten_msg_get_src_app_uri(cmd_result));
if (weak_remote == NULL) {
// Only if the 'start_graph' flow involves a connection, we need to handle
// situations relevant to that connection.
return true;
}

TEN_ASSERT(ten_engine_check_remote_is_weak(self, remote),
"%p should be a weak remote.", remote);
TEN_ASSERT(ten_remote_check_integrity(weak_remote, true),
"Invalid use of remote %p.", weak_remote);

ten_string_t detail_str;
ten_string_init(&detail_str);
Expand All @@ -63,10 +59,10 @@ static bool ten_engine_close_duplicated_remote_or_upgrade_it_to_normal(

if (ten_string_is_equal_c_str(&detail_str, TEN_STR_DUPLICATE)) {
TEN_LOGW("Receives a 'duplicate' result from %s",
ten_string_get_raw_str(&remote->uri));
ten_string_get_raw_str(&weak_remote->uri));

// This is a duplicated channel, closing it now.
ten_connection_t *connection = remote->connection;
ten_connection_t *connection = weak_remote->connection;
TEN_ASSERT(connection && ten_connection_check_integrity(connection, true),
"Should not happen.");

Expand All @@ -76,7 +72,7 @@ static bool ten_engine_close_duplicated_remote_or_upgrade_it_to_normal(
} else {
// The 'start_graph' is done, change this remote from weak-type to
// normal-type.
ten_engine_upgrade_weak_remote_to_normal_remote(self, remote);
ten_engine_upgrade_weak_remote_to_normal_remote(self, weak_remote);
}

ten_string_deinit(&detail_str);
Expand All @@ -99,14 +95,9 @@ static bool ten_engine_handle_cmd_result_for_cmd_start_graph(
"Should not happen.");

if (ten_cmd_result_get_status_code(cmd_result) == TEN_STATUS_CODE_OK) {
if (ten_cmd_base_get_original_connection(cmd_result)) {
// Only if the 'start_graph' flow involves a connection, we need to handle
// situations relevant to that connection.

bool rc = ten_engine_close_duplicated_remote_or_upgrade_it_to_normal(
self, cmd_result, err);
TEN_ASSERT(rc, "Should not happen.");
}
bool rc = ten_engine_close_duplicated_remote_or_upgrade_it_to_normal(
self, cmd_result, err);
TEN_ASSERT(rc, "Should not happen.");
}

// Find the corresponding OUT path of the cmd_result.
Expand Down
35 changes: 0 additions & 35 deletions core/src/ten_runtime/msg/cmd_base/cmd_base.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@
#include <sys/types.h>

#include "include_internal/ten_runtime/common/constant_str.h"
#include "include_internal/ten_runtime/connection/connection.h"
#include "include_internal/ten_runtime/msg/cmd_base/field/field_info.h"
#include "include_internal/ten_runtime/msg/field/field_info.h"
#include "include_internal/ten_runtime/msg/msg.h"
#include "include_internal/ten_runtime/remote/remote.h"
#include "ten_utils/lib/smart_ptr.h"
#include "ten_utils/lib/string.h"
#include "ten_utils/lib/uuid.h"
Expand Down Expand Up @@ -59,8 +57,6 @@ static void ten_raw_cmd_base_init_empty(ten_cmd_base_t *self) {
ten_value_init_string(&self->cmd_id);
ten_value_init_string(&self->seq_id);

self->original_connection = NULL;

self->result_handler = NULL;
self->result_handler_data = NULL;
}
Expand Down Expand Up @@ -120,8 +116,6 @@ void ten_raw_cmd_base_deinit(ten_cmd_base_t *self) {
ten_value_deinit(&self->seq_id);

ten_string_deinit(&self->parent_cmd_id);

self->original_connection = NULL;
}

void ten_raw_cmd_base_copy_field(ten_msg_t *self, ten_msg_t *src,
Expand Down Expand Up @@ -253,35 +247,6 @@ bool ten_cmd_base_cmd_id_is_empty(ten_shared_ptr_t *self) {
return ten_raw_cmd_base_cmd_id_is_empty(ten_shared_ptr_get_data(self));
}

static ten_connection_t *ten_raw_cmd_base_get_origin_connection(
ten_cmd_base_t *self) {
TEN_ASSERT(self && ten_raw_cmd_base_check_integrity(self),
"Should not happen.");
return self->original_connection;
}

ten_connection_t *ten_cmd_base_get_original_connection(ten_shared_ptr_t *self) {
TEN_ASSERT(self && ten_cmd_base_check_integrity(self), "Should not happen.");
return ten_raw_cmd_base_get_origin_connection(ten_shared_ptr_get_data(self));
}

static void ten_raw_cmd_base_set_origin_connection(
ten_cmd_base_t *self, ten_connection_t *connection) {
TEN_ASSERT(self && ten_raw_cmd_base_check_integrity(self) && connection &&
ten_connection_check_integrity(connection, true),
"Should not happen.");

self->original_connection = connection;
}

void ten_cmd_base_set_original_connection(ten_shared_ptr_t *self,
ten_connection_t *connection) {
TEN_ASSERT(self && ten_cmd_base_check_integrity(self) && connection,
"Should not happen.");
ten_raw_cmd_base_set_origin_connection(ten_shared_ptr_get_data(self),
connection);
}

const char *ten_cmd_base_get_cmd_id(ten_shared_ptr_t *self) {
TEN_ASSERT(self && ten_cmd_base_check_integrity(self), "Should not happen.");
return ten_string_get_raw_str(
Expand Down
22 changes: 0 additions & 22 deletions core/src/ten_runtime/msg/cmd_base/field/original_connection.c

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,7 @@ static void ten_stream_on_data(ten_stream_t *stream, void *data, int size) {
// called from the app thread (because the engine doesn't have its own
// engine thread in this case). If the protocol (i.e., the
// 'ten_protocol_t' object) is closed synchronized, the connection maybe
// destroyed before the 'close_app' cmd (in step 2) is executed. As the
// 'close_app' cmd is sent from the client, the 'original_connection'
// field of the 'close_app' cmd is not NULL, then there will be memory
// access issue when handling the cmd.
// destroyed before the 'close_app' cmd (in step 2) is executed.
//
// So the closing of the protocol should always be an _async_ operation, as
// the stream is being closed and there is no chance to receive data from
Expand Down

0 comments on commit a039570

Please sign in to comment.