Skip to content

Commit

Permalink
[refactor]: Refactor Frontend Trace OpenTelemetry Implementation (#7390)
Browse files Browse the repository at this point in the history
Co-authored-by: Iman Tabrizian <[email protected]>
  • Loading branch information
oandreeva-nv and Tabrizian authored Jul 5, 2024
1 parent 52bb23f commit 5f10d61
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 135 deletions.
150 changes: 56 additions & 94 deletions src/tracer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,23 @@ TraceManager::SampleTrace(const TraceStartOptions& start_options)
std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
ts->otel_context_ = start_options.propagated_context;
opentelemetry::nostd::shared_ptr<otel_trace_api::Span> root_span;
root_span = ts->StartSpan(
"InferRequest", steady_timestamp_ns, otel_trace_api::kSpanKey);
if (ts->span_stacks_.find(ts->trace_id_) == ts->span_stacks_.end()) {
std::unique_ptr<
std::stack<opentelemetry::nostd::shared_ptr<otel_trace_api::Span>>>
st(new std::stack<
opentelemetry::nostd::shared_ptr<otel_trace_api::Span>>());
ts->span_stacks_.emplace(ts->trace_id_, std::move(st));
}
auto active_span =
otel_trace_api::GetSpan(start_options.propagated_context);
if (active_span->GetContext().IsValid()) {
ts->span_stacks_[ts->trace_id_]->emplace(active_span);
}
// Storing "InferRequest" span as a root span
// to keep it alive for the duration of the request.
ts->otel_context_ = ts->otel_context_.SetValue(kRootSpan, root_span);
ts->root_span_ =
ts->StartSpan("InferRequest", steady_timestamp_ns, ts->trace_id_);
ts->span_stacks_[ts->trace_id_]->emplace(ts->root_span_);
#else
LOG_ERROR << "Unsupported trace mode: "
<< TraceManager::InferenceTraceModeString(ts->setting_->mode_);
Expand All @@ -358,7 +368,7 @@ TraceManager::Trace::~Trace()
setting_->WriteTrace(streams_);
} else if (setting_->mode_ == TRACE_MODE_OPENTELEMETRY) {
#ifndef _WIN32
EndSpan(kRootSpan);
EndSpan(trace_id_);
#else
LOG_ERROR << "Unsupported trace mode: "
<< TraceManager::InferenceTraceModeString(setting_->mode_);
Expand Down Expand Up @@ -390,7 +400,8 @@ TraceManager::Trace::CaptureTimestamp(
<< "{\"name\":\"" << name << "\",\"ns\":" << timestamp_ns << "}]}";
} else if (setting_->mode_ == TRACE_MODE_OPENTELEMETRY) {
#ifndef _WIN32
AddEvent(kRootSpan, name, timestamp_ns);
root_span_->AddEvent(
name, time_offset_ + std::chrono::nanoseconds{timestamp_ns});
#else
LOG_ERROR << "Unsupported trace mode: "
<< TraceManager::InferenceTraceModeString(setting_->mode_);
Expand Down Expand Up @@ -501,15 +512,15 @@ TraceManager::ProcessOpenTelemetryParameters(

void
TraceManager::Trace::StartSpan(
std::string span_key, TRITONSERVER_InferenceTrace* trace,
TRITONSERVER_InferenceTrace* trace,
TRITONSERVER_InferenceTraceActivity activity, uint64_t timestamp_ns,
uint64_t trace_id)
{
uint64_t parent_id;
LOG_TRITONSERVER_ERROR(
TRITONSERVER_InferenceTraceParentId(trace, &parent_id),
"getting trace parent id");
std::string parent_span_key = "";
auto span_parent_id = parent_id;

// Currently, only 2 types of sub-spans are supported:
// request span and compute span. Compute span is a leaf span
Expand All @@ -521,16 +532,9 @@ TraceManager::Trace::StartSpan(
// If parent_id > 0, then this is a child trace, spawned from
// the ensamble's main request. For this instance, the parent
// span is the ensembles's request span.
if (parent_id == 0 && activity == TRITONSERVER_TRACE_REQUEST_START) {
parent_span_key = kRootSpan;
} else if (activity == TRITONSERVER_TRACE_REQUEST_START) {
// [FIXME] For BLS requests parent span for children's request spans
// should be parent model's compute span. Currently,
// this won't work, since parent's compute span will be created
// only after children's spans are created.
parent_span_key = kRequestSpan + std::to_string(parent_id);
} else if (activity == TRITONSERVER_TRACE_COMPUTE_START) {
parent_span_key = kRequestSpan + std::to_string(trace_id);
if ((parent_id == 0 && activity == TRITONSERVER_TRACE_REQUEST_START) ||
(activity == TRITONSERVER_TRACE_COMPUTE_START)) {
span_parent_id = trace_id;
}

std::string display_name = "compute";
Expand All @@ -542,7 +546,7 @@ TraceManager::Trace::StartSpan(
display_name = model_name;
}

auto span = StartSpan(display_name, timestamp_ns, parent_span_key);
auto span = StartSpan(display_name, timestamp_ns, span_parent_id);

if (activity == TRITONSERVER_TRACE_REQUEST_START) {
int64_t model_version;
Expand All @@ -564,14 +568,13 @@ TraceManager::Trace::StartSpan(
PrepareTraceContext(span, &buffer);
TRITONSERVER_InferenceTraceSetContext(trace, buffer.Contents().c_str());
}

otel_context_ = otel_context_.SetValue(span_key, span);
span_stacks_[trace_id]->emplace(span);
}

opentelemetry::nostd::shared_ptr<otel_trace_api::Span>
TraceManager::Trace::StartSpan(
std::string display_name, const uint64_t& raw_timestamp_ns,
std::string parent_span_key)
uint64_t trace_id)
{
otel_trace_api::StartSpanOptions options;
options.kind = otel_trace_api::SpanKind::kServer;
Expand All @@ -580,45 +583,37 @@ TraceManager::Trace::StartSpan(
options.start_steady_time =
otel_common::SteadyTimestamp{std::chrono::nanoseconds{raw_timestamp_ns}};

// If the new span is a child span, we need to retrieve its parent from
// the context and provide it through StartSpanOptions to the child span
if (!parent_span_key.empty() && otel_context_.HasKey(parent_span_key)) {
auto parent_span = opentelemetry::nostd::get<
opentelemetry::nostd::shared_ptr<otel_trace_api::Span>>(
otel_context_.GetValue(parent_span_key));
options.parent = parent_span->GetContext();
// If the new span is a child span, we need to retrieve its parent and
// provide it through StartSpanOptions to the child span
if (span_stacks_.find(trace_id) != span_stacks_.end() &&
!span_stacks_[trace_id]->empty()) {
options.parent = span_stacks_[trace_id]->top()->GetContext();
}
auto provider = opentelemetry::trace::Provider::GetTracerProvider();
return provider->GetTracer(kTritonTracer)->StartSpan(display_name, options);
}

void
TraceManager::Trace::EndSpan(std::string span_key)
TraceManager::Trace::EndSpan(uint64_t trace_id)
{
auto timestamp_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
EndSpan(span_key, timestamp_ns);
EndSpan(timestamp_ns, trace_id);
}


void
TraceManager::Trace::EndSpan(
std::string span_key, const uint64_t& raw_timestamp_ns)
const uint64_t& raw_timestamp_ns, uint64_t trace_id)
{
if (otel_context_.HasKey(span_key)) {
auto span = opentelemetry::nostd::get<
opentelemetry::nostd::shared_ptr<otel_trace_api::Span>>(
otel_context_.GetValue(span_key));

if (span == nullptr) {
return;
}

if (span_stacks_.find(trace_id) != span_stacks_.end() &&
!span_stacks_[trace_id]->empty()) {
otel_trace_api::EndSpanOptions end_options;
end_options.end_steady_time = otel_common::SteadyTimestamp{
std::chrono::nanoseconds{raw_timestamp_ns}};
span->End(end_options);
span_stacks_[trace_id]->top()->End(end_options);
span_stacks_[trace_id]->pop();
}
}

Expand All @@ -630,79 +625,46 @@ TraceManager::Trace::ReportToOpenTelemetry(
uint64_t id;
LOG_TRITONSERVER_ERROR(
TRITONSERVER_InferenceTraceId(trace, &id), "getting trace id");

auto current_span_key = GetSpanKeyForActivity(activity, id);
if (current_span_key.empty()) {
return;
if (span_stacks_.find(id) == span_stacks_.end()) {
std::unique_ptr<
std::stack<opentelemetry::nostd::shared_ptr<otel_trace_api::Span>>>
st(new std::stack<
opentelemetry::nostd::shared_ptr<otel_trace_api::Span>>());
span_stacks_.emplace(id, std::move(st));
}

AddEvent(current_span_key, trace, activity, timestamp_ns, id);
}

std::string
TraceManager::Trace::GetSpanKeyForActivity(
TRITONSERVER_InferenceTraceActivity activity, uint64_t trace_id)
{
std::string span_name;
switch (activity) {
case TRITONSERVER_TRACE_REQUEST_START:
case TRITONSERVER_TRACE_QUEUE_START:
case TRITONSERVER_TRACE_REQUEST_END: {
span_name = kRequestSpan + std::to_string(trace_id);
break;
}

case TRITONSERVER_TRACE_COMPUTE_START:
case TRITONSERVER_TRACE_COMPUTE_INPUT_END:
case TRITONSERVER_TRACE_COMPUTE_OUTPUT_START:
case TRITONSERVER_TRACE_COMPUTE_END: {
span_name = kComputeSpan + std::to_string(trace_id);
break;
}
case TRITONSERVER_TRACE_TENSOR_QUEUE_INPUT:
case TRITONSERVER_TRACE_TENSOR_BACKEND_INPUT:
case TRITONSERVER_TRACE_TENSOR_BACKEND_OUTPUT:
default: {
LOG_ERROR << "Unsupported activity: "
<< TRITONSERVER_InferenceTraceActivityString(activity);
span_name = "";
break;
}
}

return span_name;
AddEvent(trace, activity, timestamp_ns, id);
}

void
TraceManager::Trace::AddEvent(
std::string span_key, TRITONSERVER_InferenceTrace* trace,
TRITONSERVER_InferenceTrace* trace,
TRITONSERVER_InferenceTraceActivity activity, uint64_t timestamp_ns,
uint64_t id)
uint64_t trace_id)
{
if (activity == TRITONSERVER_TRACE_REQUEST_START ||
activity == TRITONSERVER_TRACE_COMPUTE_START) {
StartSpan(span_key, trace, activity, timestamp_ns, id);
StartSpan(trace, activity, timestamp_ns, trace_id);
}

AddEvent(
span_key, TRITONSERVER_InferenceTraceActivityString(activity),
timestamp_ns);
TRITONSERVER_InferenceTraceActivityString(activity), timestamp_ns,
trace_id);

if (activity == TRITONSERVER_TRACE_REQUEST_END ||
activity == TRITONSERVER_TRACE_COMPUTE_END) {
EndSpan(span_key, timestamp_ns);
EndSpan(timestamp_ns, trace_id);
}
}

void
TraceManager::Trace::AddEvent(
std::string span_key, std::string event, uint64_t timestamp)
const std::string& event, uint64_t timestamp, uint64_t trace_id)
{
if (otel_context_.HasKey(span_key)) {
auto span = opentelemetry::nostd::get<
opentelemetry::nostd::shared_ptr<otel_trace_api::Span>>(
otel_context_.GetValue(span_key));
span->AddEvent(event, time_offset_ + std::chrono::nanoseconds{timestamp});
if (span_stacks_.find(trace_id) != span_stacks_.end() &&
!span_stacks_[trace_id]->empty()) {
span_stacks_[trace_id]->top()->AddEvent(
event, time_offset_ + std::chrono::nanoseconds{timestamp});
}
}

Expand Down
Loading

0 comments on commit 5f10d61

Please sign in to comment.