Skip to content

Commit

Permalink
feat: refine telemetry system in ten_runtime (#640)
Browse files Browse the repository at this point in the history
  • Loading branch information
halajohn authored Feb 4, 2025
1 parent 78d1285 commit a5b1327
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 129 deletions.
2 changes: 1 addition & 1 deletion core/include_internal/ten_runtime/app/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ typedef struct ten_app_t {

#if defined(TEN_ENABLE_TEN_RUST_APIS)
TelemetrySystem *telemetry_system;
MetricHandle *metric_msg_queue_stay_time_us; // micro-seconds.
MetricHandle *metric_extension_thread_msg_queue_stay_time_us;
#endif

void *user_data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ typedef struct TelemetrySystem TelemetrySystem;

#if defined(TEN_ENABLE_TEN_RUST_APIS)

TEN_RUNTIME_PRIVATE_API void ten_extension_thread_record_msg_queue_stay_time(
ten_extension_thread_t *self, int64_t timestamp);
TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_record_extension_thread_msg_queue_stay_time(
ten_extension_thread_t *self, int64_t msg_timestamp);

#endif
2 changes: 1 addition & 1 deletion core/src/ten_runtime/app/app.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ ten_app_t *ten_app_create(ten_app_on_configure_func_t on_configure,

#if defined(TEN_ENABLE_TEN_RUST_APIS)
self->telemetry_system = NULL;
self->metric_msg_queue_stay_time_us = NULL;
self->metric_extension_thread_msg_queue_stay_time_us = NULL;
#endif

self->user_data = NULL;
Expand Down
24 changes: 15 additions & 9 deletions core/src/ten_runtime/app/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ static void ten_app_create_metric(ten_app_t *self) {
TEN_ASSERT(self, "Invalid argument.");
TEN_ASSERT(ten_app_check_integrity(self, true), "Invalid use of app %p.",
self);
TEN_ASSERT(!self->metric_msg_queue_stay_time_us, "Should not happen.");
TEN_ASSERT(!self->metric_extension_thread_msg_queue_stay_time_us,
"Should not happen.");

if (self->telemetry_system) {
self->metric_msg_queue_stay_time_us = ten_metric_create(
self->telemetry_system, 1, "msg_queue_stay_time",
const char *label_names[] = {"app", "graph", "extension_group"};

self->metric_extension_thread_msg_queue_stay_time_us = ten_metric_create(
self->telemetry_system, 1, "extension_thread_msg_queue_stay_time",
"The duration (in micro-seconds) that a message instance stays in the "
"message queue before being processed.",
NULL, 0);
TEN_ASSERT(self->metric_msg_queue_stay_time_us, "Should not happen.");
"message queue of extension thread before being processed.",
label_names, 3);
TEN_ASSERT(self->metric_extension_thread_msg_queue_stay_time_us,
"Should not happen.");
}
}

Expand All @@ -36,9 +40,11 @@ static void ten_app_destroy_metric(ten_app_t *self) {
TEN_ASSERT(ten_app_check_integrity(self, true),
"Invalid use of extension_thread %p.", self);

if (self->metric_msg_queue_stay_time_us) {
ten_metric_destroy(self->metric_msg_queue_stay_time_us);
self->metric_msg_queue_stay_time_us = NULL;
if (self->metric_extension_thread_msg_queue_stay_time_us) {
TEN_ASSERT(self->telemetry_system, "Should not happen.");

ten_metric_destroy(self->metric_extension_thread_msg_queue_stay_time_us);
self->metric_extension_thread_msg_queue_stay_time_us = NULL;
}
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/ten_runtime/extension_thread/msg_interface/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ static void ten_extension_thread_handle_in_msg_task(void *self_, void *arg) {

#if defined(TEN_ENABLE_TEN_RUST_APIS)
int64_t timestamp = ten_msg_get_timestamp(msg);
ten_extension_thread_record_msg_queue_stay_time(self, timestamp);
ten_extension_thread_record_extension_thread_msg_queue_stay_time(self,
timestamp);
#endif

switch (ten_extension_thread_get_state(self)) {
Expand Down
70 changes: 54 additions & 16 deletions core/src/ten_runtime/extension_thread/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "include_internal/ten_runtime/app/app.h"
#include "include_internal/ten_runtime/engine/engine.h"
#include "include_internal/ten_runtime/extension_context/extension_context.h"
#include "include_internal/ten_runtime/extension_group/extension_group.h"
#include "include_internal/ten_runtime/extension_thread/extension_thread.h"
#include "include_internal/ten_runtime/extension_thread/telemetry.h"
#include "ten_utils/lib/time.h"
Expand All @@ -19,37 +20,74 @@

#if defined(TEN_ENABLE_TEN_RUST_APIS)

static MetricHandle *ten_extension_thread_get_metric_msg_queue_stay_time_us(
ten_extension_thread_t *self) {
static MetricHandle *
ten_extension_thread_get_metric_extension_thread_msg_queue_stay_time_us(
ten_extension_thread_t *self, const char **app_uri, const char **graph_id,
const char **extension_group_name) {
TEN_ASSERT(self && ten_extension_thread_check_integrity(self, true),
"Invalid argument.");
TEN_ASSERT(app_uri && graph_id && extension_group_name, "Invalid argument.");

*extension_group_name =
ten_extension_group_get_name(self->extension_group, true);

ten_extension_context_t *extension_context = self->extension_context;
TEN_ASSERT(extension_context && ten_extension_context_check_integrity(
extension_context, false),
"Should not happen.");
TEN_ASSERT(
extension_context &&
ten_extension_context_check_integrity(
extension_context,
// When the extension thread is still running, this instance will
// definitely exist, and since the current operation does not
// involve any write actions, so it is safe.
false),
"Should not happen.");

ten_engine_t *engine = extension_context->engine;
TEN_ASSERT(engine && ten_engine_check_integrity(engine, false),
"Should not happen.");
TEN_ASSERT(
engine && ten_engine_check_integrity(
engine,
// When the extension thread is still running, this instance
// will definitely exist, and since the current operation
// does not involve any write actions, so it is safe.
false),
"Should not happen.");

*graph_id = ten_engine_get_id(engine, false);

ten_app_t *app = engine->app;
TEN_ASSERT(app && ten_app_check_integrity(app, false), "Should not happen.");
TEN_ASSERT(
app && ten_app_check_integrity(
app,
// When the extension thread is still running, this instance
// will definitely exist, and since the current operation does
// not involve any write actions, so it is safe.
false),
"Should not happen.");

*app_uri = ten_app_get_uri(app);

return app->metric_msg_queue_stay_time_us;
return app->metric_extension_thread_msg_queue_stay_time_us;
}

void ten_extension_thread_record_msg_queue_stay_time(
ten_extension_thread_t *self, int64_t timestamp) {
void ten_extension_thread_record_extension_thread_msg_queue_stay_time(
ten_extension_thread_t *self, int64_t msg_timestamp) {
TEN_ASSERT(self, "Invalid argument.");
TEN_ASSERT(ten_extension_thread_check_integrity(self, true),
"Invalid use of extension_thread %p.", self);

int64_t duration_us = ten_current_time_us() - timestamp;
MetricHandle *msg_queue_stay_time =
ten_extension_thread_get_metric_msg_queue_stay_time_us(self);
if (msg_queue_stay_time) {
ten_metric_gauge_set(msg_queue_stay_time, (double)duration_us);
const char *app_uri = NULL;
const char *graph_id = NULL;
const char *extension_group_name = NULL;
MetricHandle *extension_thread_msg_queue_stay_time =
ten_extension_thread_get_metric_extension_thread_msg_queue_stay_time_us(
self, &app_uri, &graph_id, &extension_group_name);
if (extension_thread_msg_queue_stay_time) {
int64_t duration_us = ten_current_time_us() - msg_timestamp;

const char *label_values[] = {app_uri, graph_id, extension_group_name};

ten_metric_gauge_set(extension_thread_msg_queue_stay_time,
(double)duration_us, label_values, 3);
}
}

Expand Down
Loading

0 comments on commit a5b1327

Please sign in to comment.