diff --git a/docs/components.md b/docs/components.md index d474750081..b41a30e24c 100644 --- a/docs/components.md +++ b/docs/components.md @@ -90,7 +90,7 @@ The distribution offers support for the following components. | [logging](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/loggingexporter) | [in development] | | [otlp](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlpexporter) | [stable] | | [otlphttp](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter) | [stable] | -| [pulsar](../internal/exporter/pulsarexporter) | [deprecated] | +| [pulsar](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/pulsarexporter) | [alpha] | | [signalfx](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/signalfxexporter) | [beta] | | [sapm](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/sapmexporter) | [beta] | | [splunk_hec](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/splunkhecexporter) | [beta] | diff --git a/go.mod b/go.mod index 2f096264f7..57f3703fd9 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.20 require ( github.com/alecthomas/participle/v2 v2.1.0 github.com/antonmedv/expr v1.15.3 - github.com/apache/pulsar-client-go v0.11.0 github.com/cenkalti/backoff/v4 v4.2.1 github.com/fsnotify/fsnotify v1.6.0 github.com/go-zookeeper/zk v1.0.3 @@ -19,6 +18,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.85.0 + github.com/open-telemetry/opentelemetry-collector-contrib/exporter/pulsarexporter v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sapmexporter v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter v0.85.0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter v0.85.0 @@ -102,7 +102,6 @@ require ( go.opentelemetry.io/collector v0.85.0 go.opentelemetry.io/collector/config/confighttp v0.85.0 go.opentelemetry.io/collector/config/configtelemetry v0.85.0 - go.opentelemetry.io/collector/config/configtls v0.85.0 go.opentelemetry.io/collector/confmap v0.85.0 go.opentelemetry.io/collector/connector v0.85.0 go.opentelemetry.io/collector/connector/forwardconnector v0.85.0 @@ -144,6 +143,7 @@ require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/andybalholm/brotli v1.0.4 // indirect github.com/apache/arrow/go/v12 v12.0.1 // indirect + github.com/apache/pulsar-client-go v0.11.0 // indirect github.com/bits-and-blooms/bitset v1.4.0 // indirect github.com/bmatcuk/doublestar/v4 v4.6.0 // indirect github.com/cpuguy83/dockercfg v0.3.1 // indirect @@ -183,6 +183,7 @@ require ( go.opentelemetry.io/collector/config/configgrpc v0.85.0 // indirect go.opentelemetry.io/collector/config/confignet v0.85.0 // indirect go.opentelemetry.io/collector/config/configopaque v0.85.0 // indirect + go.opentelemetry.io/collector/config/configtls v0.85.0 // indirect go.opentelemetry.io/collector/config/internal v0.85.0 // indirect go.opentelemetry.io/collector/extension/auth v0.85.0 // indirect go.opentelemetry.io/otel/bridge/opencensus v0.40.0 // indirect diff --git a/go.sum b/go.sum index dd7fedeccc..e1136397ba 100644 --- a/go.sum +++ b/go.sum @@ -1113,6 +1113,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.85.0 h1:zCFV4nSdHziunsWYB/Zwsy2C4W88KlACyENQyjLZHw8= github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.85.0/go.mod h1:ADyJg6g2zJ3t6FMGOdw0PjmFhZhDxNT2QVFdt6ZFl5k= github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter v0.85.0 h1:6/OlktNNS8X3pL/Ry1pBAjwB4QGXjOm7MvF7Qjucbso= +github.com/open-telemetry/opentelemetry-collector-contrib/exporter/pulsarexporter v0.85.0 h1:NdfsmyqEoPlb3cpE4BA0CgsIGfKgeQrFbf1JFjfT6H4= +github.com/open-telemetry/opentelemetry-collector-contrib/exporter/pulsarexporter v0.85.0/go.mod h1:Fv9WTcMmTppFfwU0FtmtRpiGtEnI9BMcxe8WxPpyuTo= github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sapmexporter v0.85.0 h1:RMprMmQjOZN5rIjKlMytSia2jm82yqLHx/6uKavmJUg= github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sapmexporter v0.85.0/go.mod h1:OHiRqFpbPfvGDbNafskojP6nBLw2ARY4UTenttfztQs= github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter v0.85.0 h1:fEe/yJLui8Xaqe8M+f3Vu3U61zW4CtJKW578PgMKwVs= diff --git a/internal/components/components.go b/internal/components/components.go index 3eb55627d6..c108b22170 100644 --- a/internal/components/components.go +++ b/internal/components/components.go @@ -20,6 +20,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/pulsarexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sapmexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter" @@ -100,8 +101,6 @@ import ( "go.uber.org/multierr" "github.com/signalfx/splunk-otel-collector/internal/exporter/httpsinkexporter" - //lint:ignore SA1019 We will replace the exporter in future versions - "github.com/signalfx/splunk-otel-collector/internal/exporter/pulsarexporter" "github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver" "github.com/signalfx/splunk-otel-collector/internal/receiver/discoveryreceiver" "github.com/signalfx/splunk-otel-collector/internal/receiver/lightprometheusreceiver" @@ -189,7 +188,6 @@ func Get() (otelcol.Factories, error) { signalfxexporter.NewFactory(), splunkhecexporter.NewFactory(), httpsinkexporter.NewFactory(), - //lint:ignore SA1019 We will replace the exporter in future versions pulsarexporter.NewFactory(), ) if err != nil { diff --git a/internal/exporter/pulsarexporter/README.md b/internal/exporter/pulsarexporter/README.md deleted file mode 100644 index 9f322e1961..0000000000 --- a/internal/exporter/pulsarexporter/README.md +++ /dev/null @@ -1,12 +0,0 @@ -# Pulsar Exporter - -| Status | | -| ------------------------ |-----------------------| -| Stability | [deprecated] | -| Supported pipeline types | metrics | - -The Pulsar exporter is used to export metrics to Apache Pulsar. - -The exporter is deprecated and will be replaced by the [OpenTelemetry Collector upstream exporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/pulsarexporter). - -[deprecated]:https://github.com/open-telemetry/opentelemetry-collector#deprecated \ No newline at end of file diff --git a/internal/exporter/pulsarexporter/config.go b/internal/exporter/pulsarexporter/config.go deleted file mode 100644 index e6101e0422..0000000000 --- a/internal/exporter/pulsarexporter/config.go +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pulsarexporter - -import ( - "errors" - "fmt" - "time" - - "github.com/apache/pulsar-client-go/pulsar" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtls" - "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.uber.org/zap" -) - -type Authentication struct { - TLS *configtls.TLSClientSetting `mapstructure:"tls"` -} - -// Config defines configuration for pulsar exporter. -type Config struct { - exporterhelper.QueueSettings `mapstructure:"sending_queue"` - Authentication Authentication `mapstructure:"auth"` - Broker string `mapstructure:"broker"` - Topic string `mapstructure:"topic"` - Encoding string `mapstructure:"encoding"` - Producer Producer `mapstructure:"producer"` - exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` - exporterhelper.TimeoutSettings `mapstructure:",squash"` - OperationTimeout time.Duration `mapstructure:"operation_timeout"` - ConnectionTimeout time.Duration `mapstructure:"connection_timeout"` -} - -// Producer defines configuration for producer -type Producer struct { - Properties map[string]string `mapstructure:"producer_properties"` - MaxReconnectToBroker *uint `mapstructure:"max_reconnect_broker"` - SendTimeout *time.Duration `mapstructure:"send_timeout"` - BatcherBuilderType string `mapstructure:"batch_builder_type"` - CompressionType string `mapstructure:"compression_type"` - CompressionLevel string `mapstructure:"compression_level"` - HashingScheme string `mapstructure:"hashing_scheme"` - MaxPendingMessages int `mapstructure:"max_pending_messages"` - PartitionsAutoDiscoveryInterval time.Duration `mapstructure:"partitions_auto_discovery_interval"` - BatchingMaxPublishDelay time.Duration `mapstructure:"batching_max_publish_delay"` - BatchingMaxMessages uint `mapstructure:"batching_max_messages"` - BatchingMaxSize uint `mapstructure:"batching_max_size"` - DisableBlockIfQueueFull bool `mapstructure:"disable_block_if_queue_full"` - DisableBatching bool `mapstructure:"disable_batching"` -} - -var _ component.Config = (*Config)(nil) - -// Validate checks if the exporter configuration is valid -func (cfg *Config) Validate() error { - return nil -} - -func (cfg *Config) getClientOptions() (pulsar.ClientOptions, error) { - - options := pulsar.ClientOptions{ - URL: cfg.Broker, - OperationTimeout: cfg.OperationTimeout, - ConnectionTimeout: cfg.ConnectionTimeout, - MaxConnectionsPerBroker: 1, - } - - if cfg.Authentication.TLS.InsecureSkipVerify { - options.TLSAllowInsecureConnection = cfg.Authentication.TLS.InsecureSkipVerify - return options, nil - } - - if len(cfg.Authentication.TLS.CAFile) > 0 && len(cfg.Authentication.TLS.CertFile) > 0 && len(cfg.Authentication.TLS.KeyFile) > 0 { - options.TLSTrustCertsFilePath = cfg.Authentication.TLS.CAFile - options.Authentication = pulsar.NewAuthenticationTLS(cfg.Authentication.TLS.CertFile, cfg.Authentication.TLS.KeyFile) - } else { - return options, errors.New("failed to load TLS config. If certs are not available, set insecure_skip_verify to true for insecure connection") - } - - return options, nil -} - -func (cfg *Config) getProducerOptions(logger *zap.Logger) (pulsar.ProducerOptions, error) { - // Properties are not used. Issue a warning that these are no longer used. - if cfg.Producer.Properties != nil { - logger.Warn("`producer.properties` is no longer used and will be removed in a subsequent release. Please remove this property from the configuration.") - } - if cfg.Producer.BatcherBuilderType == "1" { - logger.Warn("`producer.batch_builder_type` value 1 is deprecated and should use the value `key_based` instead.") - } - if cfg.Producer.BatcherBuilderType == "0" { - logger.Warn("`producer.batch_builder_type` value 0 is deprecated and should use the value `default` instead.") - } - timeout := cfg.Timeout - if cfg.Producer.SendTimeout != nil { - logger.Warn("`producer.send_timeout` is deprecated and will be removed in a subsequent release. Please use `timeout` instead") - timeout = *cfg.Producer.SendTimeout - } - - producerOptions := pulsar.ProducerOptions{ - Topic: cfg.Topic, - DisableBatching: cfg.Producer.DisableBatching, - SendTimeout: timeout, - DisableBlockIfQueueFull: cfg.Producer.DisableBlockIfQueueFull, - MaxPendingMessages: cfg.Producer.MaxPendingMessages, - BatchingMaxPublishDelay: cfg.Producer.BatchingMaxPublishDelay, - BatchingMaxSize: cfg.Producer.BatchingMaxSize, - BatchingMaxMessages: cfg.Producer.BatchingMaxMessages, - PartitionsAutoDiscoveryInterval: cfg.Producer.PartitionsAutoDiscoveryInterval, - MaxReconnectToBroker: cfg.Producer.MaxReconnectToBroker, - } - - batchBuilderType, err := stringToBatchBuilderType(cfg.Producer.BatcherBuilderType) - if err != nil { - return producerOptions, err - } - producerOptions.BatcherBuilderType = batchBuilderType - - compressionType, err := stringToCompressionType(cfg.Producer.CompressionType) - if err != nil { - return producerOptions, err - } - producerOptions.CompressionType = compressionType - - compressionLevel, err := stringToCompressionLevel(cfg.Producer.CompressionLevel) - if err != nil { - return producerOptions, err - } - producerOptions.CompressionLevel = compressionLevel - - hashingScheme, err := stringToHashingScheme(cfg.Producer.HashingScheme) - if err != nil { - return producerOptions, err - } - producerOptions.HashingScheme = hashingScheme - - return producerOptions, nil -} - -func stringToCompressionType(compressionType string) (pulsar.CompressionType, error) { - switch compressionType { - case "none": - return pulsar.NoCompression, nil - case "lz4": - return pulsar.LZ4, nil - case "zlib": - return pulsar.ZLib, nil - case "zstd": - return pulsar.ZSTD, nil - default: - return pulsar.NoCompression, fmt.Errorf("producer.compressionType should be one of 'none', 'lz4', 'zlib', or 'zstd'. configured value %v. Assigning default value as none", compressionType) - } -} - -func stringToCompressionLevel(compressionLevel string) (pulsar.CompressionLevel, error) { - switch compressionLevel { - case "default": - return pulsar.Default, nil - case "faster": - return pulsar.Faster, nil - case "better": - return pulsar.Better, nil - default: - return pulsar.Default, fmt.Errorf("producer.compressionLevel should be one of 'default', 'faster', or 'better'. configured value %v. Assigning default value as default", compressionLevel) - } -} - -func stringToHashingScheme(hashingScheme string) (pulsar.HashingScheme, error) { - switch hashingScheme { - case "java_string_hash": - return pulsar.JavaStringHash, nil - case "murmur3_32hash": - return pulsar.Murmur3_32Hash, nil - default: - return pulsar.JavaStringHash, fmt.Errorf("producer.hashingScheme should be one of 'java_string_hash' or 'murmur3_32hash'. configured value %v, Assigning default value as java_string_hash", hashingScheme) - } -} - -func stringToBatchBuilderType(builderType string) (pulsar.BatcherBuilderType, error) { - switch builderType { - case "0": - return pulsar.DefaultBatchBuilder, nil - case "1": - return pulsar.KeyBasedBatchBuilder, nil - case "default": - return pulsar.DefaultBatchBuilder, nil - case "key_based": - return pulsar.KeyBasedBatchBuilder, nil - default: - return pulsar.DefaultBatchBuilder, fmt.Errorf("producer.batchBuilderType should be one of 'default' or 'key_based'. configured value %v. Assigning default value as default", builderType) - } -} diff --git a/internal/exporter/pulsarexporter/doc.go b/internal/exporter/pulsarexporter/doc.go deleted file mode 100644 index df1770cc34..0000000000 --- a/internal/exporter/pulsarexporter/doc.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Deprecated: This exporter will be replaced by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/pulsarexporter -// Package pulsarexporter exports trace data to Pulsar. -package pulsarexporter diff --git a/internal/exporter/pulsarexporter/factory.go b/internal/exporter/pulsarexporter/factory.go deleted file mode 100644 index 4bb3d66f2b..0000000000 --- a/internal/exporter/pulsarexporter/factory.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pulsarexporter - -import ( - ctx "context" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtls" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper" -) - -const ( - typeStr = "pulsar" - defaultMetricsTopic = "otlp_metrics" - defaultEncoding = "otlp_proto" - defaultBroker = "pulsar://localhost:6651" - defaultCompressionType = "none" - defaultCompressionLevel = "default" - defaultHashingScheme = "java_string_hash" -) - -// FactoryOption applies changes to pulsarExporterFactory. -type FactoryOption func(factory *pulsarExporterFactory) - -// Deprecated: This exporter will be replaced by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/pulsarexporter -// NewFactory creates pulsar exporter factory. -func NewFactory(options ...FactoryOption) exporter.Factory { - f := &pulsarExporterFactory{ - metricsMarshalers: metricsMarshalers(), - } - for _, option := range options { - option(f) - } - return exporter.NewFactory( - typeStr, - createDefaultConfig, - exporter.WithMetrics(f.createMetricsExporter, component.StabilityLevelAlpha), - ) -} - -type pulsarExporterFactory struct { - metricsMarshalers map[string]MetricsMarshaler -} - -func createDefaultConfig() component.Config { - - return &Config{ - TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), - RetrySettings: exporterhelper.NewDefaultRetrySettings(), - QueueSettings: exporterhelper.NewDefaultQueueSettings(), - Broker: defaultBroker, - Topic: defaultMetricsTopic, - Encoding: defaultEncoding, - Producer: Producer{ - CompressionType: defaultCompressionType, - CompressionLevel: defaultCompressionLevel, - HashingScheme: defaultHashingScheme, - }, - Authentication: Authentication{TLS: &configtls.TLSClientSetting{ - InsecureSkipVerify: true, - }}, - } -} - -func (f *pulsarExporterFactory) createMetricsExporter( - ctx ctx.Context, - settings exporter.CreateSettings, - cfg component.Config, -) (exporter.Metrics, error) { - settings.Logger.Warn("the pulsarexporter component is deprecated and will be replaced by github.com/open-telemetry/opentelemetry-collector-contrib/exporter/pulsarexporter") - oCfg := cfg.(*Config) - if oCfg.Encoding == "otlp_json" { - settings.Logger.Info("otlp_json is considered experimental and should not be used in a production environment") - } - exp, err := newMetricsExporter(*oCfg, settings, f.metricsMarshalers) - if err != nil { - return nil, err - } - return exporterhelper.NewMetricsExporter( - ctx, - settings, - cfg, - exp.metricsDataPusher, - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), - exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), - exporterhelper.WithRetry(oCfg.RetrySettings), - exporterhelper.WithQueue(oCfg.QueueSettings), - exporterhelper.WithShutdown(exp.Close)) -} diff --git a/internal/exporter/pulsarexporter/factory_test.go b/internal/exporter/pulsarexporter/factory_test.go deleted file mode 100644 index a0f2a09b06..0000000000 --- a/internal/exporter/pulsarexporter/factory_test.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright Splunk, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pulsarexporter - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/exporter/exportertest" -) - -func TestCreateDefaultConfig(t *testing.T) { - cfg := createDefaultConfig().(*Config) - assert.NotNil(t, cfg, "failed to create default config") - assert.NoError(t, componenttest.CheckConfigStruct(cfg)) - assert.Equal(t, defaultBroker, cfg.Broker) - assert.Equal(t, defaultMetricsTopic, cfg.Topic) -} - -func TestCreateMetricsExporter_err(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Broker = "invalid:9092" - mf := pulsarExporterFactory{metricsMarshalers: metricsMarshalers()} - mr, err := mf.createMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) - require.Error(t, err) - assert.Nil(t, mr) -} diff --git a/internal/exporter/pulsarexporter/marshaler.go b/internal/exporter/pulsarexporter/marshaler.go deleted file mode 100644 index 9c3972aa05..0000000000 --- a/internal/exporter/pulsarexporter/marshaler.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pulsarexporter - -import ( - "github.com/apache/pulsar-client-go/pulsar" - "go.opentelemetry.io/collector/pdata/pmetric" -) - -// MetricsMarshaler marshals metrics into Message array -type MetricsMarshaler interface { - // Marshal serializes metrics into pulsar's ProducerMessage - Marshal(metrics pmetric.Metrics) ([]*pulsar.ProducerMessage, error) - - // Encoding returns encoding name - Encoding() string -} - -// metricsMarshalers returns map of supported encodings and MetricsMarshaler -func metricsMarshalers() map[string]MetricsMarshaler { - otlpPb := newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding) - otlpJSON := newPdataMetricsMarshaler(&pmetric.JSONMarshaler{}, "otlp_json") - return map[string]MetricsMarshaler{ - otlpPb.Encoding(): otlpPb, - otlpJSON.Encoding(): otlpJSON, - } -} diff --git a/internal/exporter/pulsarexporter/marshaler_test.go b/internal/exporter/pulsarexporter/marshaler_test.go deleted file mode 100644 index 6c9973e66c..0000000000 --- a/internal/exporter/pulsarexporter/marshaler_test.go +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright The OpenTelemetry Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pulsarexporter - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestDefaultMetricsMarshalers(t *testing.T) { - expectedEncodings := []string{ - "otlp_proto", - "otlp_json", - } - marshalers := metricsMarshalers() - assert.Equal(t, len(expectedEncodings), len(marshalers)) - for _, e := range expectedEncodings { - t.Run(e, func(t *testing.T) { - m, ok := marshalers[e] - require.True(t, ok) - assert.NotNil(t, m) - }) - } -} diff --git a/internal/exporter/pulsarexporter/pdata_marshaler.go b/internal/exporter/pulsarexporter/pdata_marshaler.go deleted file mode 100644 index e4eb655e91..0000000000 --- a/internal/exporter/pulsarexporter/pdata_marshaler.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pulsarexporter - -import ( - "github.com/apache/pulsar-client-go/pulsar" - "go.opentelemetry.io/collector/pdata/pmetric" -) - -type pdataMetricsMarshaler struct { - marshaler pmetric.Marshaler - encoding string -} - -func (p pdataMetricsMarshaler) Marshal(metrics pmetric.Metrics) ([]*pulsar.ProducerMessage, error) { - bts, err := p.marshaler.MarshalMetrics(metrics) - if err != nil { - return nil, err - } - return []*pulsar.ProducerMessage{ - { - Payload: bts, - }, - }, nil -} - -func (p pdataMetricsMarshaler) Encoding() string { - return p.encoding -} - -func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, encoding string) MetricsMarshaler { - return pdataMetricsMarshaler{ - marshaler: marshaler, - encoding: encoding, - } -} diff --git a/internal/exporter/pulsarexporter/pulsar_exporter.go b/internal/exporter/pulsarexporter/pulsar_exporter.go deleted file mode 100644 index 6f3758f5bb..0000000000 --- a/internal/exporter/pulsarexporter/pulsar_exporter.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pulsarexporter - -import ( - "context" - "fmt" - - "github.com/apache/pulsar-client-go/pulsar" - "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.uber.org/multierr" - "go.uber.org/zap" -) - -var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") - -// pulsarMetricsExporter produce metrics messages to pulsar -type pulsarMetricsExporter struct { - producer pulsar.Producer - marshaler MetricsMarshaler - logger *zap.Logger - topic string -} - -func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers map[string]MetricsMarshaler) (*pulsarMetricsExporter, error) { - marshaler := marshalers[config.Encoding] - if marshaler == nil { - return nil, errUnrecognizedEncoding - } - producer, err := newPulsarProducer(config, set.Logger) - - if err != nil { - return nil, err - } - - return &pulsarMetricsExporter{ - producer: producer, - topic: config.Topic, - marshaler: marshaler, - logger: set.Logger, - }, nil -} - -func newPulsarProducer(config Config, logger *zap.Logger) (pulsar.Producer, error) { - // Get pulsar client options - clientOptions, clientOptionsErr := config.getClientOptions() - if clientOptionsErr != nil { - return nil, clientOptionsErr - } - - // Initiate pulsar client - client, clientErr := pulsar.NewClient(clientOptions) - if clientErr != nil { - return nil, clientErr - } - - // Get pulsar producer options - producerOptions, producerOptionsErr := config.getProducerOptions(logger) - if producerOptionsErr != nil { - return nil, producerOptionsErr - } - - // Initiate pulsar producer - producer, producerErr := client.CreateProducer(producerOptions) - if producerErr != nil { - return nil, producerErr - } - - return producer, nil -} - -func (e *pulsarMetricsExporter) metricsDataPusher(ctx context.Context, md pmetric.Metrics) error { - messages, err := e.marshaler.Marshal(md) - if err != nil { - return consumererror.NewPermanent(err) - } - var errors error - for _, element := range messages { - e.producer.SendAsync(ctx, element, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) { - if err != nil { - errors = multierr.Append(errors, err) - } - }) - } - if errors == nil { - return nil - } - return fmt.Errorf("pulsar producer failed to send metric data due to error: %w", errors) -} - -func (e *pulsarMetricsExporter) Close(context.Context) error { - e.producer.Close() - return nil -} diff --git a/internal/exporter/pulsarexporter/testdata/config.yaml b/internal/exporter/pulsarexporter/testdata/config.yaml deleted file mode 100644 index 925677a8f7..0000000000 --- a/internal/exporter/pulsarexporter/testdata/config.yaml +++ /dev/null @@ -1,38 +0,0 @@ -processors: - nop: - -receivers: - nop: - -exporters: - pulsar: - topic: otlp_metrics - broker: pulsar+ssl://localhost:6651 - timeout: 0 - auth: - tls: - ca_file: "/path/to/cacert" - cert_file: "/path/to/cert" - key_file: "/path/to/key" - insecure_skip_verify: true - producer: - disable_block_if_queue_full: false - max_pending_messages: 100 - hashing_scheme: java_string_hash - compression_type: zstd - compression_level: default - batch_builder_type: key_based - disable_batching: false - # unit is nanoseconds (10^-9), set to 10 milliseconds in nanoseconds - batching_max_publish_delay: 10000000 - batching_max_messages: 1000 - batching_max_size: 128000 - # unit is nanoseconds (10^-9), set to 1 minute in nanoseconds - partitions_auto_discovery_interval: 60000000000 - -service: - pipelines: - metrics: - receivers: [ nop ] - processors: [ nop ] - exporters: [ pulsar ]