Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add grpc response struct for all signals instead of returning interface #3437

Merged
merged 1 commit into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions exporter/otlpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ type mockTracesReceiver struct {
lastRequest pdata.Traces
}

func (r *mockTracesReceiver) Export(ctx context.Context, td pdata.Traces) (interface{}, error) {
func (r *mockTracesReceiver) Export(ctx context.Context, td pdata.Traces) (pdatagrpc.TracesResponse, error) {
atomic.AddInt32(&r.requestCount, 1)
atomic.AddInt32(&r.totalItems, int32(td.SpanCount()))
r.mux.Lock()
defer r.mux.Unlock()
r.lastRequest = td
r.metadata, _ = metadata.FromIncomingContext(ctx)
return nil, nil
return pdatagrpc.NewTracesResponse(), nil
}

func (r *mockTracesReceiver) GetLastRequest() pdata.Traces {
Expand Down Expand Up @@ -92,14 +92,14 @@ type mockLogsReceiver struct {
lastRequest pdata.Logs
}

func (r *mockLogsReceiver) Export(ctx context.Context, ld pdata.Logs) (interface{}, error) {
func (r *mockLogsReceiver) Export(ctx context.Context, ld pdata.Logs) (pdatagrpc.LogsResponse, error) {
atomic.AddInt32(&r.requestCount, 1)
atomic.AddInt32(&r.totalItems, int32(ld.LogRecordCount()))
r.mux.Lock()
defer r.mux.Unlock()
r.lastRequest = ld
r.metadata, _ = metadata.FromIncomingContext(ctx)
return nil, nil
return pdatagrpc.NewLogsResponse(), nil
}

func (r *mockLogsReceiver) GetLastRequest() pdata.Logs {
Expand Down Expand Up @@ -129,15 +129,15 @@ type mockMetricsReceiver struct {
lastRequest pdata.Metrics
}

func (r *mockMetricsReceiver) Export(ctx context.Context, md pdata.Metrics) (interface{}, error) {
func (r *mockMetricsReceiver) Export(ctx context.Context, md pdata.Metrics) (pdatagrpc.MetricsResponse, error) {
atomic.AddInt32(&r.requestCount, 1)
_, recordCount := md.MetricAndDataPointCount()
atomic.AddInt32(&r.totalItems, int32(recordCount))
r.mux.Lock()
defer r.mux.Unlock()
r.lastRequest = md
r.metadata, _ = metadata.FromIncomingContext(ctx)
return nil, nil
return pdatagrpc.NewMetricsResponse(), nil
}

func (r *mockMetricsReceiver) GetLastRequest() pdata.Metrics {
Expand Down
27 changes: 18 additions & 9 deletions internal/pdatagrpc/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ import (
otlpcollectorlogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
)

// TODO: Consider to add `LogsRequest` and `LogsResponse`. Right now the funcs return interface{},
// it would be better and future proof to create a LogsResponse empty struct and return that.
// So if we ever add things in the OTLP response I can deal with that. Similar for request if we add non pdata properties.
// TODO: Consider to add `LogsRequest`. If we add non pdata properties we can add them to the request.

// LogsResponse represents the response for gRPC client/server.
type LogsResponse struct {
orig *otlpcollectorlogs.ExportLogsServiceResponse
}

// NewLogsResponse returns an empty LogsResponse.
func NewLogsResponse() LogsResponse {
return LogsResponse{orig: &otlpcollectorlogs.ExportLogsServiceResponse{}}
}

// LogsClient is the client API for OTLP-GRPC Logs service.
//
Expand All @@ -36,7 +44,7 @@ type LogsClient interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(ctx context.Context, in pdata.Logs, opts ...grpc.CallOption) (interface{}, error)
Export(ctx context.Context, in pdata.Logs, opts ...grpc.CallOption) (LogsResponse, error)
}

type logsClient struct {
Expand All @@ -48,8 +56,9 @@ func NewLogsClient(cc *grpc.ClientConn) LogsClient {
return &logsClient{rawClient: otlpcollectorlogs.NewLogsServiceClient(cc)}
}

func (c *logsClient) Export(ctx context.Context, in pdata.Logs, opts ...grpc.CallOption) (interface{}, error) {
return c.rawClient.Export(ctx, internal.LogsToOtlp(in.InternalRep()), opts...)
func (c *logsClient) Export(ctx context.Context, in pdata.Logs, opts ...grpc.CallOption) (LogsResponse, error) {
rsp, err := c.rawClient.Export(ctx, internal.LogsToOtlp(in.InternalRep()), opts...)
return LogsResponse{orig: rsp}, err
}

// LogsServer is the server API for OTLP gRPC LogsService service.
Expand All @@ -58,7 +67,7 @@ type LogsServer interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(context.Context, pdata.Logs) (interface{}, error)
Export(context.Context, pdata.Logs) (LogsResponse, error)
}

// RegisterLogsServer registers the LogsServer to the grpc.Server.
Expand All @@ -71,6 +80,6 @@ type rawLogsServer struct {
}

func (s rawLogsServer) Export(ctx context.Context, request *otlpcollectorlogs.ExportLogsServiceRequest) (*otlpcollectorlogs.ExportLogsServiceResponse, error) {
_, err := s.srv.Export(ctx, pdata.LogsFromInternalRep(internal.LogsFromOtlp(request)))
return &otlpcollectorlogs.ExportLogsServiceResponse{}, err
rsp, err := s.srv.Export(ctx, pdata.LogsFromInternalRep(internal.LogsFromOtlp(request)))
return rsp.orig, err
}
27 changes: 18 additions & 9 deletions internal/pdatagrpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ import (
otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
)

// TODO: Consider to add `MetricsRequest` and `MetricsResponse`. Right now the funcs return interface{},
// it would be better and future proof to create a MetricsResponse empty struct and return that.
// So if we ever add things in the OTLP response I can deal with that. Similar for request if we add non pdata properties.
// TODO: Consider to add `MetricsRequest`. If we add non pdata properties we can add them to the request.

// MetricsResponse represents the response for gRPC client/server.
type MetricsResponse struct {
orig *otlpcollectormetrics.ExportMetricsServiceResponse
}

// NewMetricsResponse returns an empty MetricsResponse.
func NewMetricsResponse() MetricsResponse {
return MetricsResponse{orig: &otlpcollectormetrics.ExportMetricsServiceResponse{}}
}

// MetricsClient is the client API for OTLP-GRPC Metrics service.
//
Expand All @@ -36,7 +44,7 @@ type MetricsClient interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(ctx context.Context, in pdata.Metrics, opts ...grpc.CallOption) (interface{}, error)
Export(ctx context.Context, in pdata.Metrics, opts ...grpc.CallOption) (MetricsResponse, error)
}

type metricsClient struct {
Expand All @@ -48,8 +56,9 @@ func NewMetricsClient(cc *grpc.ClientConn) MetricsClient {
return &metricsClient{rawClient: otlpcollectormetrics.NewMetricsServiceClient(cc)}
}

func (c *metricsClient) Export(ctx context.Context, in pdata.Metrics, opts ...grpc.CallOption) (interface{}, error) {
return c.rawClient.Export(ctx, internal.MetricsToOtlp(in.InternalRep()), opts...)
func (c *metricsClient) Export(ctx context.Context, in pdata.Metrics, opts ...grpc.CallOption) (MetricsResponse, error) {
rsp, err := c.rawClient.Export(ctx, internal.MetricsToOtlp(in.InternalRep()), opts...)
return MetricsResponse{orig: rsp}, err
}

// MetricsServer is the server API for OTLP gRPC MetricsService service.
Expand All @@ -58,7 +67,7 @@ type MetricsServer interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(context.Context, pdata.Metrics) (interface{}, error)
Export(context.Context, pdata.Metrics) (MetricsResponse, error)
}

// RegisterMetricsServer registers the MetricsServer to the grpc.Server.
Expand All @@ -71,6 +80,6 @@ type rawMetricsServer struct {
}

func (s rawMetricsServer) Export(ctx context.Context, request *otlpcollectormetrics.ExportMetricsServiceRequest) (*otlpcollectormetrics.ExportMetricsServiceResponse, error) {
_, err := s.srv.Export(ctx, pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(request)))
return &otlpcollectormetrics.ExportMetricsServiceResponse{}, err
rsp, err := s.srv.Export(ctx, pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(request)))
return rsp.orig, err
}
27 changes: 18 additions & 9 deletions internal/pdatagrpc/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ import (
otlpcollectortraces "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
)

// TODO: Consider to add `TracesRequest` and `TracesResponse`. Right now the funcs return interface{},
// it would be better and future proof to create a TracesResponse empty struct and return that.
// So if we ever add things in the OTLP response I can deal with that. Similar for request if we add non pdata properties.
// TODO: Consider to add `TracesRequest`. If we add non pdata properties we can add them to the request.

// TracesResponse represents the response for gRPC client/server.
type TracesResponse struct {
orig *otlpcollectortraces.ExportTraceServiceResponse
}

// NewTracesResponse returns an empty TracesResponse.
func NewTracesResponse() TracesResponse {
return TracesResponse{orig: &otlpcollectortraces.ExportTraceServiceResponse{}}
}

// TracesClient is the client API for OTLP-GRPC Traces service.
//
Expand All @@ -36,7 +44,7 @@ type TracesClient interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(ctx context.Context, in pdata.Traces, opts ...grpc.CallOption) (interface{}, error)
Export(ctx context.Context, in pdata.Traces, opts ...grpc.CallOption) (TracesResponse, error)
}

type tracesClient struct {
Expand All @@ -49,8 +57,9 @@ func NewTracesClient(cc *grpc.ClientConn) TracesClient {
}

// Export implements the TracesClient interface.
func (c *tracesClient) Export(ctx context.Context, in pdata.Traces, opts ...grpc.CallOption) (interface{}, error) {
return c.rawClient.Export(ctx, internal.TracesToOtlp(in.InternalRep()), opts...)
func (c *tracesClient) Export(ctx context.Context, in pdata.Traces, opts ...grpc.CallOption) (TracesResponse, error) {
rsp, err := c.rawClient.Export(ctx, internal.TracesToOtlp(in.InternalRep()), opts...)
return TracesResponse{orig: rsp}, err
}

// TracesServer is the server API for OTLP gRPC TracesService service.
Expand All @@ -59,7 +68,7 @@ type TracesServer interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(context.Context, pdata.Traces) (interface{}, error)
Export(context.Context, pdata.Traces) (TracesResponse, error)
}

// RegisterTracesServer registers the TracesServer to the grpc.Server.
Expand All @@ -73,6 +82,6 @@ type rawTracesServer struct {

func (s rawTracesServer) Export(ctx context.Context, request *otlpcollectortraces.ExportTraceServiceRequest) (*otlpcollectortraces.ExportTraceServiceResponse, error) {
internal.TracesCompatibilityChanges(request)
_, err := s.srv.Export(ctx, pdata.TracesFromInternalRep(internal.TracesFromOtlp(request)))
return &otlpcollectortraces.ExportTraceServiceResponse{}, err
rsp, err := s.srv.Export(ctx, pdata.TracesFromInternalRep(internal.TracesFromOtlp(request)))
return rsp.orig, err
}
7 changes: 4 additions & 3 deletions receiver/otlpreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -53,15 +54,15 @@ const (
var receiverID = config.NewIDWithName("otlp", "log")

// Export implements the service Export logs func.
func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (interface{}, error) {
func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (pdatagrpc.LogsResponse, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
err := r.sendToNextConsumer(ctxWithReceiverName, ld)
if err != nil {
return nil, err
return pdatagrpc.LogsResponse{}, err
}

return nil, nil
return pdatagrpc.NewLogsResponse(), nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, ld pdata.Logs) error {
Expand Down
2 changes: 1 addition & 1 deletion receiver/otlpreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestExport_ErrorConsumer(t *testing.T) {

resp, err := logClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.Nil(t, resp)
assert.Equal(t, pdatagrpc.LogsResponse{}, resp)
}

func makeLogsServiceClient(addr net.Addr) (pdatagrpc.LogsClient, func(), error) {
Expand Down
7 changes: 4 additions & 3 deletions receiver/otlpreceiver/internal/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -52,14 +53,14 @@ const (
var receiverID = config.NewIDWithName("otlp", "metrics")

// Export implements the service Export metrics func.
func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (interface{}, error) {
func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (pdatagrpc.MetricsResponse, error) {
receiverCtx := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
err := r.sendToNextConsumer(receiverCtx, md)
if err != nil {
return nil, err
return pdatagrpc.MetricsResponse{}, err
}

return nil, nil
return pdatagrpc.NewMetricsResponse(), nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, md pdata.Metrics) error {
Expand Down
2 changes: 1 addition & 1 deletion receiver/otlpreceiver/internal/metrics/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestExport_ErrorConsumer(t *testing.T) {

resp, err := metricsClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.Nil(t, resp)
assert.Equal(t, pdatagrpc.MetricsResponse{}, resp)
}

func makeMetricsServiceClient(addr net.Addr) (pdatagrpc.MetricsClient, func(), error) {
Expand Down
7 changes: 4 additions & 3 deletions receiver/otlpreceiver/internal/trace/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -53,15 +54,15 @@ const (
var receiverID = config.NewIDWithName("otlp", "trace")

// Export implements the service Export traces func.
func (r *Receiver) Export(ctx context.Context, td pdata.Traces) (interface{}, error) {
func (r *Receiver) Export(ctx context.Context, td pdata.Traces) (pdatagrpc.TracesResponse, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
err := r.sendToNextConsumer(ctxWithReceiverName, td)
if err != nil {
return nil, err
return pdatagrpc.TracesResponse{}, err
}

return nil, nil
return pdatagrpc.NewTracesResponse(), nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, td pdata.Traces) error {
Expand Down
2 changes: 1 addition & 1 deletion receiver/otlpreceiver/internal/trace/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestExport_ErrorConsumer(t *testing.T) {
req := testdata.GenerateTracesOneSpan()
resp, err := traceClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.Nil(t, resp)
assert.Equal(t, pdatagrpc.TracesResponse{}, resp)
}

func makeTraceServiceClient(addr net.Addr) (pdatagrpc.TracesClient, func(), error) {
Expand Down