Skip to content

Commit

Permalink
[EXPORTER] Async exporting for otlp grpc (open-telemetry#2407)
Browse files Browse the repository at this point in the history
  • Loading branch information
owent authored Feb 8, 2024
1 parent cf5cdaa commit 6e8f7c4
Show file tree
Hide file tree
Showing 21 changed files with 1,151 additions and 123 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Increment the:

## [Unreleased]

* [EXPORTER] Add async exporting for OTLP/GRPC exporter
[#2407](https://github.com/open-telemetry/opentelemetry-cpp/pull/2407)
* [API] Fix b3, w3c and jaeger propagators: they will not overwrite
the active span with a default invalid span, which is especially useful
when used with CompositePropagator
Expand Down
3 changes: 3 additions & 0 deletions api/include/opentelemetry/common/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,19 @@ point.
#if defined(__clang__)

# define OPENTELEMETRY_API_SINGLETON __attribute__((visibility("default")))
# define OPENTELEMETRY_LOCAL_SYMBOL __attribute__((visibility("hidden")))

#elif defined(__GNUC__)

# define OPENTELEMETRY_API_SINGLETON __attribute__((visibility("default")))
# define OPENTELEMETRY_LOCAL_SYMBOL __attribute__((visibility("hidden")))

#else

/* Add support for other compilers here. */

# define OPENTELEMETRY_API_SINGLETON
# define OPENTELEMETRY_LOCAL_SYMBOL

#endif

Expand Down
118 changes: 107 additions & 11 deletions exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@

#pragma once

#include <grpcpp/completion_queue.h>
#include <grpcpp/grpcpp.h>

#include <atomic>
#include <memory>

#include "opentelemetry/sdk/common/exporter_utils.h"

#include "opentelemetry/exporters/otlp/otlp_grpc_client_options.h"

#include "opentelemetry/exporters/otlp/protobuf_include_prefix.h"

#include "google/protobuf/arena.h"
#include "opentelemetry/proto/collector/logs/v1/logs_service.grpc.pb.h"
#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h"
Expand All @@ -25,12 +30,20 @@ namespace otlp

struct OtlpGrpcClientOptions;

#ifdef ENABLE_ASYNC_EXPORT
struct OtlpGrpcClientAsyncData;
#endif

/**
* The OTLP gRPC client contains utility functions of gRPC.
*/
class OtlpGrpcClient
{
public:
OtlpGrpcClient();

~OtlpGrpcClient();

/**
* Create gRPC channel from the exporter options.
*/
Expand All @@ -42,11 +55,6 @@ class OtlpGrpcClient
static std::unique_ptr<grpc::ClientContext> MakeClientContext(
const OtlpGrpcClientOptions &options);

/**
* Create gRPC CompletionQueue to async call RPC.
*/
static std::unique_ptr<grpc::CompletionQueue> MakeCompletionQueue();

/**
* Create trace service stub to communicate with the OpenTelemetry Collector.
*/
Expand All @@ -67,21 +75,109 @@ class OtlpGrpcClient

static grpc::Status DelegateExport(
proto::collector::trace::v1::TraceService::StubInterface *stub,
grpc::ClientContext *context,
const proto::collector::trace::v1::ExportTraceServiceRequest &request,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::trace::v1::ExportTraceServiceRequest &&request,
proto::collector::trace::v1::ExportTraceServiceResponse *response);

static grpc::Status DelegateExport(
proto::collector::metrics::v1::MetricsService::StubInterface *stub,
grpc::ClientContext *context,
const proto::collector::metrics::v1::ExportMetricsServiceRequest &request,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::metrics::v1::ExportMetricsServiceRequest &&request,
proto::collector::metrics::v1::ExportMetricsServiceResponse *response);

static grpc::Status DelegateExport(
proto::collector::logs::v1::LogsService::StubInterface *stub,
grpc::ClientContext *context,
const proto::collector::logs::v1::ExportLogsServiceRequest &request,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::logs::v1::ExportLogsServiceRequest &&request,
proto::collector::logs::v1::ExportLogsServiceResponse *response);

#ifdef ENABLE_ASYNC_EXPORT

/**
* Async export
* @param options Options used to message to create gRPC context and stub(if necessary)
* @param arena Protobuf arena to hold lifetime of all messages
* @param request Request for this RPC
* @param result_callback callback to call when the exporting is done
* @return return the status of this operation
*/
sdk::common::ExportResult DelegateAsyncExport(
const OtlpGrpcClientOptions &options,
proto::collector::trace::v1::TraceService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::trace::v1::ExportTraceServiceRequest &&request,
std::function<bool(opentelemetry::sdk::common::ExportResult,
std::unique_ptr<google::protobuf::Arena> &&,
const proto::collector::trace::v1::ExportTraceServiceRequest &,
proto::collector::trace::v1::ExportTraceServiceResponse *)>
&&result_callback) noexcept;

/**
* Async export
* @param options Options used to message to create gRPC context and stub(if necessary)
* @param arena Protobuf arena to hold lifetime of all messages
* @param request Request for this RPC
* @param result_callback callback to call when the exporting is done
* @return return the status of this operation
*/
sdk::common::ExportResult DelegateAsyncExport(
const OtlpGrpcClientOptions &options,
proto::collector::metrics::v1::MetricsService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::metrics::v1::ExportMetricsServiceRequest &&request,
std::function<bool(opentelemetry::sdk::common::ExportResult,
std::unique_ptr<google::protobuf::Arena> &&,
const proto::collector::metrics::v1::ExportMetricsServiceRequest &,
proto::collector::metrics::v1::ExportMetricsServiceResponse *)>
&&result_callback) noexcept;

/**
* Async export
* @param options Options used to message to create gRPC context and stub(if necessary)
* @param arena Protobuf arena to hold lifetime of all messages
* @param request Request for this RPC
* @param result_callback callback to call when the exporting is done
* @return return the status of this operation
*/
sdk::common::ExportResult DelegateAsyncExport(
const OtlpGrpcClientOptions &options,
proto::collector::logs::v1::LogsService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::logs::v1::ExportLogsServiceRequest &&request,
std::function<bool(opentelemetry::sdk::common::ExportResult,
std::unique_ptr<google::protobuf::Arena> &&,
const proto::collector::logs::v1::ExportLogsServiceRequest &,
proto::collector::logs::v1::ExportLogsServiceResponse *)>
&&result_callback) noexcept;

/**
* Force flush the gRPC client.
*/
bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

/**
* Shut down the gRPC client.
* @param timeout an optional timeout, the default timeout of 0 means that no
* timeout is applied.
* @return return the status of this operation
*/
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept;

std::shared_ptr<OtlpGrpcClientAsyncData> MutableAsyncData(const OtlpGrpcClientOptions &options);

private:
// Stores if this gRPC client had its Shutdown() method called
std::atomic<bool> is_shutdown_;

// Stores shared data between threads of this gRPC client
std::shared_ptr<OtlpGrpcClientAsyncData> async_data_;
#endif
};
} // namespace otlp
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ struct OtlpGrpcClientOptions

/** User agent. */
std::string user_agent;

/** max number of threads that can be allocated from this */
std::size_t max_threads;

#ifdef ENABLE_ASYNC_EXPORT
// Concurrent requests
std::size_t max_concurrent_requests;
#endif
};

} // namespace otlp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace exporter
namespace otlp
{

class OtlpGrpcClient;

/**
* The OTLP exporter exports span data in OpenTelemetry Protocol (OTLP) format.
*/
Expand Down Expand Up @@ -73,6 +75,10 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
// The configuration options associated with this exporter.
const OtlpGrpcExporterOptions options_;

#ifdef ENABLE_ASYNC_EXPORT
std::shared_ptr<OtlpGrpcClient> client_;
#endif

// For testing
friend class OtlpGrpcExporterTestPeer;
friend class OtlpGrpcLogRecordExporterTestPeer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace exporter
namespace otlp
{

class OtlpGrpcClient;

/**
* The OTLP exporter exports log data in OpenTelemetry Protocol (OTLP) format in gRPC.
*/
Expand Down Expand Up @@ -73,6 +75,10 @@ class OtlpGrpcLogRecordExporter : public opentelemetry::sdk::logs::LogRecordExpo
// Configuration options for the exporter
const OtlpGrpcLogRecordExporterOptions options_;

#ifdef ENABLE_ASYNC_EXPORT
std::shared_ptr<OtlpGrpcClient> client_;
#endif

// For testing
friend class OtlpGrpcLogRecordExporterTestPeer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace exporter
namespace otlp
{

class OtlpGrpcClient;

/**
* The OTLP exporter exports metrics data in OpenTelemetry Protocol (OTLP) format in gRPC.
*/
Expand Down Expand Up @@ -59,6 +61,10 @@ class OtlpGrpcMetricExporter : public opentelemetry::sdk::metrics::PushMetricExp
// The configuration options associated with this exporter.
const OtlpGrpcMetricExporterOptions options_;

#ifdef ENABLE_ASYNC_EXPORT
std::shared_ptr<OtlpGrpcClient> client_;
#endif

// Aggregation Temporality selector
const sdk::metrics::AggregationTemporalitySelector aggregation_temporality_selector_;

Expand Down
Loading

0 comments on commit 6e8f7c4

Please sign in to comment.