Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Google Pubsub as an OTLP exporter/receiver #1892

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dynatraceexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/f5cloudexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/gcloudpubsubexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/honeycombexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerthrifthttpexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
Expand All @@ -56,6 +57,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/gcloudpubsubreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jmxreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver"
Expand Down Expand Up @@ -114,6 +116,7 @@ func components() (component.Factories, error) {
wavefrontreceiver.NewFactory(),
windowsperfcountersreceiver.NewFactory(),
zookeeperreceiver.NewFactory(),
gcloudpubsubreceiver.NewFactory(),
}

receivers = append(receivers, extraReceivers()...)
Expand Down Expand Up @@ -149,6 +152,7 @@ func components() (component.Factories, error) {
splunkhecexporter.NewFactory(),
stackdriverexporter.NewFactory(),
sumologicexporter.NewFactory(),
gcloudpubsubexporter.NewFactory(),
}
for _, exp := range factories.Exporters {
exporters = append(exporters, exp)
Expand Down
1 change: 1 addition & 0 deletions exporter/gcloudpubsubexporter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
23 changes: 23 additions & 0 deletions exporter/gcloudpubsubexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Google Pubsub Exporter

This exporter sends OTLP messages to a Google Cloud [Pubsub](https://cloud.google.com/pubsub) topic.

The following configuration options are supported:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation for user agent


* `project` (Required): The Google Cloud Project of the topics.
* `validate_existence`(Optional): Checks the existence of the topic, but this requires admin permissions to validate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to list out the actual permission names required, both for normal use and when setting this flag.

Also I would consider removing this flag as it might just complicate things and failing when sending messages isn't that bad, many of our exporters work that way.

Copy link
Member

@gramidt gramidt Dec 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is interesting. This verification could make sense to be implemented on exporter startup (how you implemented it), as this would have a fast feedback loop and prevent trying to send data that is going to fail. However, topics can be created at anytime. Since the collector uses a static config, would it be safe to assume that the topic name is known and has been created prior to configuring a collector? Or is it best to assume a fully asynchronous approach and assume a topic may exist at some time in the future?

the existence.
* `traces_topic` (Optional): The topic name to send OTLP trace data over, this is the name within the project.
* `metrics_topic` (Optional): The topic name to send OTLP metric data over, this is the name within the project.
* `logs_topic` (Optional): The topic name to send OTLP log data over, this is the name within the project.

```yaml
exporters:
gcloudpubsub:
project: my-project
validate_existence: false
traces_topic: otlp-traces
metrics_topic: otlp-metrics
logs_topic: otlp-logs
```

36 changes: 36 additions & 0 deletions exporter/gcloudpubsubexporter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gcloudpubsubexporter

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
ProjectID string `mapstructure:"project"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some comments. Endpoint in particular needs something on when it would be set (probably very rarely by normal users)

UserAgent string `mapstructure:"user_agent"`
Endpoint string `mapstructure:"endpoint"`
// Only has effect if Endpoint is not ""
UseInsecure bool `mapstructure:"use_insecure"`
// Timeout for all API calls. If not set, defaults to 12 seconds.
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

ValidateExistence bool `mapstructure:"validate_existence"`
MetricsTopic string `mapstructure:"metrics_topic"`
TracesTopic string `mapstructure:"traces_topic"`
LogsTopic string `mapstructure:"logs_topic"`
}
64 changes: 64 additions & 0 deletions exporter/gcloudpubsubexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gcloudpubsubexporter

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.Nil(t, err)

factory := NewFactory()
factories.Exporters[configmodels.Type(typeStr)] = factory
cfg, err := configtest.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), factories,
)

require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Exporters), 2)

r0 := cfg.Exporters["gcloudpubsub"]
assert.Equal(t, r0, factory.CreateDefaultConfig())

r1 := cfg.Exporters["gcloudpubsub/customname"].(*Config)
assert.Equal(t, r1,
&Config{
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "gcloudpubsub/customname"},
ProjectID: "my-project",
UserAgent: "opentelemetry-collector-contrib {{version}}",
Endpoint: "test-endpoint",
UseInsecure: true,
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 20 * time.Second,
},
ValidateExistence: true,
TracesTopic: "otlp-traces",
MetricsTopic: "otlp-metrics",
LogsTopic: "otlp-logs",
})
}
166 changes: 166 additions & 0 deletions exporter/gcloudpubsubexporter/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gcloudpubsubexporter

import (
"context"
"fmt"

"cloud.google.com/go/pubsub"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shopify has been running a Google PubSub (trace) exporter in production for almost a year. The largest improvement in stability came from switching to the lower level package cloud.google.com/go/pubsub/apiv1. The high level package used here provides an asynchronous Publish method that does not allow back pressure in the collector, and limits the usefulness of the memorylimiter processor, queued retry, etc. The lower level package provides a synchronous Publish call, which is more appropriate for the collector.

cc @ibawt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @fbogsany I will certainly rewrite this as I was a bit struggling with this. I'm writing the OTEP now and will refer to this in this thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shopify has been running a Google PubSub (trace) exporter in production for almost a year. The largest improvement in stability came from switching to the lower level package cloud.google.com/go/pubsub/apiv1. The high level package used here provides an asynchronous Publish method that does not allow back pressure in the collector, and limits the usefulness of the memorylimiter processor, queued retry, etc. The lower level package provides a synchronous Publish call, which is more appropriate for the collector.

cc @ibawt

At Shopify do you push the complete OTLP Request over PubSub or are you unraffling the content. Let me elaborate: For spans, the ExportTraceServiceRequest contains a repeated ResourceSpans. The request is already a mini-batch. I'm going to propose to put the ResourceSpans on Pubsub (or any message bus).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We publish a full ExportTraceServiceRequest. We treat PubSub as a transport for OTLP, which makes things easier for downstream consumers. In particular, we can reuse code between the collector and the consumer. For example, we have an exporter from PubSub to GCS (Avro) that we'll eventually move into the collector, so it helps if that receives the data it would receive if it were in the collector instead.

Specifically, for traces pdata.Traces, we Publish the gzipped bytes returned from traces.ToOtlpProtoBytes().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it's a big effort, I do wish that OTLP had a FullSpan or similar type which was a normalized span with resource / instrumentation library built in. Having the ability to pack a self-contained span can help with cases like message queues, downstream processing like metrics updating, or even updating a trace with a new span, generally model better with span-per-message when dealing with retries and such.

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
"google.golang.org/api/option"
"google.golang.org/grpc"
)

const name = "gcloudpubsub"

// pubsubExporter is a wrapper struct of OT cloud trace exporter
type pubsubExporter struct {
instanceName string
logger *zap.Logger

tracesTopicName string
metricsTopicName string
logsTopicName string

tracesTopic *pubsub.Topic
metricsTopic *pubsub.Topic
logsTopic *pubsub.Topic

//
userAgent string
config *Config
//
client *pubsub.Client
}

func (*pubsubExporter) Name() string {
return name
}

func (ex *pubsubExporter) Start(ctx context.Context, _ component.Host) error {
if ex.client == nil {
copts, _ := ex.generateClientOptions()
client, _ := pubsub.NewClient(context.Background(), ex.config.ProjectID, copts...)
ex.client = client
}

if ex.tracesTopic == nil && ex.tracesTopicName != "" {
ex.tracesTopic = ex.client.TopicInProject(ex.tracesTopicName, ex.config.ProjectID)
if ex.config.ValidateExistence {
tctx, cancel := context.WithTimeout(ctx, ex.config.Timeout)
defer cancel()
exist, err := ex.tracesTopic.Exists(tctx)
if err != nil {
return err
}
if !exist {
return fmt.Errorf("trace subscription %s doesn't exist", ex.tracesTopic)
}
}
}
if ex.metricsTopic == nil && ex.metricsTopicName != "" {
ex.metricsTopic = ex.client.TopicInProject(ex.metricsTopicName, ex.config.ProjectID)
if ex.config.ValidateExistence {
tctx, cancel := context.WithTimeout(ctx, ex.config.Timeout)
defer cancel()
exist, err := ex.metricsTopic.Exists(tctx)
if err != nil {
return err
}
if !exist {
return fmt.Errorf("metric subscription %s doesn't exist", ex.tracesTopic)
}
}
}
if ex.logsTopic == nil && ex.logsTopicName != "" {
ex.logsTopic = ex.client.TopicInProject(ex.logsTopicName, ex.config.ProjectID)
if ex.config.ValidateExistence {
tctx, cancel := context.WithTimeout(ctx, ex.config.Timeout)
defer cancel()
exist, err := ex.logsTopic.Exists(tctx)
if err != nil {
return err
}
if !exist {
return fmt.Errorf("log subscription %s doesn't exist", ex.tracesTopic)
}
}
}
return nil
}

func (ex *pubsubExporter) Shutdown(context.Context) error {
if ex.tracesTopic != nil {
ex.tracesTopic.Stop()
ex.tracesTopic = nil
}
if ex.metricsTopic != nil {
ex.metricsTopic.Stop()
ex.metricsTopic = nil
}
if ex.logsTopic != nil {
ex.logsTopic.Stop()
ex.logsTopic = nil
}
if ex.client != nil {
ex.client.Close()
ex.client = nil
}
return nil
}

func (ex *pubsubExporter) generateClientOptions() ([]option.ClientOption, error) {
var copts []option.ClientOption
if ex.userAgent != "" {
copts = append(copts, option.WithUserAgent(ex.userAgent))
}
if ex.config.Endpoint != "" {
if ex.config.UseInsecure {
var dialOpts []grpc.DialOption
if ex.userAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(ex.userAgent))
}
conn, _ := grpc.Dial(ex.config.Endpoint, append(dialOpts, grpc.WithInsecure())...)
copts = append(copts, option.WithGRPCConn(conn))
} else {
copts = append(copts, option.WithEndpoint(ex.config.Endpoint))
}
}
return copts, nil
}

func (ex *pubsubExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
bytes, _ := td.ToOtlpProtoBytes()
message := &pubsub.Message{Data: bytes}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should put a PR in for the opentelemetry-specification to document how OTLP interacts with Pub/Sub? While it's straightforward to push the bytes into Data, there could still be different interpretations of how Pub/Sub is supposed to work so we probably want to make sure they're unified across implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @anuraaga. I'll make one comment in reply to your initial review. Thanks, I needed a second eye on this.

  • I will add it to the spec. I'll ask in gitter to see if I should do a PR or an OTEP first. I see that Kafka does it the same way as I (sending the Request), so that's good.
  • I will split up the PR but will keep this one open in the meantime (I converted it to draft), I want to have someone from Google involved todo a quick scan.
  • All fix your concerns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking forward to reviewing the spec you draft up, @alexvanboxel! 📘

_ = ex.tracesTopic.Publish(ctx, message)
return nil
}

func (ex *pubsubExporter) ConsumeMetrics(ctx context.Context, td pdata.Metrics) error {
bytes, _ := td.ToOtlpProtoBytes()
message := &pubsub.Message{Data: bytes}
_ = ex.metricsTopic.Publish(ctx, message)
return nil
}

func (ex *pubsubExporter) ConsumeLogs(ctx context.Context, td pdata.Logs) error {
bytes, _ := td.ToOtlpProtoBytes()
message := &pubsub.Message{Data: bytes}
_ = ex.logsTopic.Publish(ctx, message)
return nil
}
Loading