Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refine telemetry system in ten_runtime #640

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/include_internal/ten_runtime/app/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ typedef struct ten_app_t {

#if defined(TEN_ENABLE_TEN_RUST_APIS)
TelemetrySystem *telemetry_system;
MetricHandle *metric_msg_queue_stay_time_us; // micro-seconds.
MetricHandle *metric_extension_thread_msg_queue_stay_time_us;
#endif

void *user_data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ typedef struct TelemetrySystem TelemetrySystem;

#if defined(TEN_ENABLE_TEN_RUST_APIS)

TEN_RUNTIME_PRIVATE_API void ten_extension_thread_record_msg_queue_stay_time(
ten_extension_thread_t *self, int64_t timestamp);
TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_record_extension_thread_msg_queue_stay_time(
ten_extension_thread_t *self, int64_t msg_timestamp);

#endif
2 changes: 1 addition & 1 deletion core/src/ten_runtime/app/app.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ ten_app_t *ten_app_create(ten_app_on_configure_func_t on_configure,

#if defined(TEN_ENABLE_TEN_RUST_APIS)
self->telemetry_system = NULL;
self->metric_msg_queue_stay_time_us = NULL;
self->metric_extension_thread_msg_queue_stay_time_us = NULL;
#endif

self->user_data = NULL;
Expand Down
24 changes: 15 additions & 9 deletions core/src/ten_runtime/app/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ static void ten_app_create_metric(ten_app_t *self) {
TEN_ASSERT(self, "Invalid argument.");
TEN_ASSERT(ten_app_check_integrity(self, true), "Invalid use of app %p.",
self);
TEN_ASSERT(!self->metric_msg_queue_stay_time_us, "Should not happen.");
TEN_ASSERT(!self->metric_extension_thread_msg_queue_stay_time_us,
"Should not happen.");

if (self->telemetry_system) {
self->metric_msg_queue_stay_time_us = ten_metric_create(
self->telemetry_system, 1, "msg_queue_stay_time",
const char *label_names[] = {"app", "graph", "extension_group"};

self->metric_extension_thread_msg_queue_stay_time_us = ten_metric_create(
self->telemetry_system, 1, "extension_thread_msg_queue_stay_time",
"The duration (in micro-seconds) that a message instance stays in the "
"message queue before being processed.",
NULL, 0);
TEN_ASSERT(self->metric_msg_queue_stay_time_us, "Should not happen.");
"message queue of extension thread before being processed.",
label_names, 3);
TEN_ASSERT(self->metric_extension_thread_msg_queue_stay_time_us,
"Should not happen.");
}
}

Expand All @@ -36,9 +40,11 @@ static void ten_app_destroy_metric(ten_app_t *self) {
TEN_ASSERT(ten_app_check_integrity(self, true),
"Invalid use of extension_thread %p.", self);

if (self->metric_msg_queue_stay_time_us) {
ten_metric_destroy(self->metric_msg_queue_stay_time_us);
self->metric_msg_queue_stay_time_us = NULL;
if (self->metric_extension_thread_msg_queue_stay_time_us) {
TEN_ASSERT(self->telemetry_system, "Should not happen.");

ten_metric_destroy(self->metric_extension_thread_msg_queue_stay_time_us);
self->metric_extension_thread_msg_queue_stay_time_us = NULL;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ static void ten_extension_thread_handle_in_msg_task(void *self_, void *arg) {

#if defined(TEN_ENABLE_TEN_RUST_APIS)
int64_t timestamp = ten_msg_get_timestamp(msg);
ten_extension_thread_record_msg_queue_stay_time(self, timestamp);
ten_extension_thread_record_extension_thread_msg_queue_stay_time(self,
timestamp);
#endif

switch (ten_extension_thread_get_state(self)) {
Expand Down
70 changes: 54 additions & 16 deletions core/src/ten_runtime/extension_thread/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "include_internal/ten_runtime/app/app.h"
#include "include_internal/ten_runtime/engine/engine.h"
#include "include_internal/ten_runtime/extension_context/extension_context.h"
#include "include_internal/ten_runtime/extension_group/extension_group.h"
#include "include_internal/ten_runtime/extension_thread/extension_thread.h"
#include "include_internal/ten_runtime/extension_thread/telemetry.h"
#include "ten_utils/lib/time.h"
Expand All @@ -19,37 +20,74 @@

#if defined(TEN_ENABLE_TEN_RUST_APIS)

static MetricHandle *ten_extension_thread_get_metric_msg_queue_stay_time_us(
ten_extension_thread_t *self) {
static MetricHandle *
ten_extension_thread_get_metric_extension_thread_msg_queue_stay_time_us(
ten_extension_thread_t *self, const char **app_uri, const char **graph_id,
const char **extension_group_name) {
TEN_ASSERT(self && ten_extension_thread_check_integrity(self, true),
"Invalid argument.");
TEN_ASSERT(app_uri && graph_id && extension_group_name, "Invalid argument.");

*extension_group_name =
ten_extension_group_get_name(self->extension_group, true);

ten_extension_context_t *extension_context = self->extension_context;
TEN_ASSERT(extension_context && ten_extension_context_check_integrity(
extension_context, false),
"Should not happen.");
TEN_ASSERT(
extension_context &&
ten_extension_context_check_integrity(
extension_context,
// When the extension thread is still running, this instance will
// definitely exist, and since the current operation does not
// involve any write actions, so it is safe.
false),
"Should not happen.");

ten_engine_t *engine = extension_context->engine;
TEN_ASSERT(engine && ten_engine_check_integrity(engine, false),
"Should not happen.");
TEN_ASSERT(
engine && ten_engine_check_integrity(
engine,
// When the extension thread is still running, this instance
// will definitely exist, and since the current operation
// does not involve any write actions, so it is safe.
false),
"Should not happen.");

*graph_id = ten_engine_get_id(engine, false);

ten_app_t *app = engine->app;
TEN_ASSERT(app && ten_app_check_integrity(app, false), "Should not happen.");
TEN_ASSERT(
app && ten_app_check_integrity(
app,
// When the extension thread is still running, this instance
// will definitely exist, and since the current operation does
// not involve any write actions, so it is safe.
false),
"Should not happen.");

*app_uri = ten_app_get_uri(app);

return app->metric_msg_queue_stay_time_us;
return app->metric_extension_thread_msg_queue_stay_time_us;
}

void ten_extension_thread_record_msg_queue_stay_time(
ten_extension_thread_t *self, int64_t timestamp) {
void ten_extension_thread_record_extension_thread_msg_queue_stay_time(
ten_extension_thread_t *self, int64_t msg_timestamp) {
TEN_ASSERT(self, "Invalid argument.");
TEN_ASSERT(ten_extension_thread_check_integrity(self, true),
"Invalid use of extension_thread %p.", self);

int64_t duration_us = ten_current_time_us() - timestamp;
MetricHandle *msg_queue_stay_time =
ten_extension_thread_get_metric_msg_queue_stay_time_us(self);
if (msg_queue_stay_time) {
ten_metric_gauge_set(msg_queue_stay_time, (double)duration_us);
const char *app_uri = NULL;
const char *graph_id = NULL;
const char *extension_group_name = NULL;
MetricHandle *extension_thread_msg_queue_stay_time =
ten_extension_thread_get_metric_extension_thread_msg_queue_stay_time_us(
self, &app_uri, &graph_id, &extension_group_name);
if (extension_thread_msg_queue_stay_time) {
int64_t duration_us = ten_current_time_us() - msg_timestamp;

const char *label_values[] = {app_uri, graph_id, extension_group_name};

ten_metric_gauge_set(extension_thread_msg_queue_stay_time,
(double)duration_us, label_values, 3);
}
}

Expand Down
Loading