From 55d1913bdf175ecfcb585b92ea84b5bb4739d931 Mon Sep 17 00:00:00 2001
From: Joe Elliott <number101010@gmail.com>
Date: Mon, 28 Sep 2020 10:37:17 -0400
Subject: [PATCH] Use OTEL Kafka Exporter/Receiver Instead of Jaeger Core
 (#2494)

* build/lint/tests pass

Signed-off-by: Joe Elliott <number101010@gmail.com>

* wip: receiver tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Updated otel collector dep

Signed-off-by: Joe Elliott <number101010@gmail.com>

* wip : test cleanup

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Fleshed out receiver config and added tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Split out kafka flags.  Removed merge from test

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added exporter config translation and default test

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added exporter tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* fixed import

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added default protocol version

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Add kafkareceiver to prevent to flags to get defaults

Signed-off-by: Joe Elliott <number101010@gmail.com>

* kafak => kafka

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added config tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added from-flag test

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added MustOtelEncodingForJaegerEncoding tests

Signed-off-by: Joe Elliott <number101010@gmail.com>
---
 cmd/ingester/app/flags.go                     |  23 ++-
 .../app/defaultcomponents/defaults.go         |  27 ++-
 .../app/defaultcomponents/defaults_test.go    |   4 -
 .../app/defaultconfig/default_config.go       |   4 +-
 .../app/exporter/kafkaexporter/config.go      |  27 ---
 .../app/exporter/kafkaexporter/config_test.go |  94 ----------
 .../app/exporter/kafkaexporter/exporter.go    |  34 ----
 .../app/exporter/kafkaexporter/factory.go     |  96 ----------
 .../exporter/kafkaexporter/factory_test.go    |  69 -------
 .../kafkaexporter/{doc.go => flags.go}        |  13 +-
 .../exporter/kafkaexporter/kafka_exporter.go  | 129 ++++++++++++++
 .../kafkaexporter/kafka_exporter_test.go      |  88 +++++++++
 .../kafkaexporter/testdata/config.yaml        |  56 +++---
 .../kafkaexporter/testdata/jaeger-config.yaml |   4 +
 cmd/opentelemetry/app/flags.go                |   2 +-
 .../app/receiver/kafkareceiver/config.go      |  27 ---
 .../app/receiver/kafkareceiver/config_test.go |  92 ----------
 .../app/receiver/kafkareceiver/doc.go         |  16 --
 .../app/receiver/kafkareceiver/factory.go     | 107 -----------
 .../receiver/kafkareceiver/factory_test.go    |  69 -------
 .../app/receiver/kafkareceiver/flags.go       |   2 +-
 .../receiver/kafkareceiver/kafka_receiver.go  | 168 ++++++++++++------
 .../kafkareceiver/kafka_receiver_test.go      | 130 ++++++++++++++
 .../kafkareceiver/testdata/config.yaml        |  53 +++---
 .../kafkareceiver/testdata/jaeger-config.yaml |   4 +
 cmd/opentelemetry/go.sum                      |   2 +
 .../hotrod/services/frontend/gen_assets.go    |   4 +-
 plugin/storage/es/mappings/gen_assets.go      |  14 +-
 plugin/storage/kafka/options.go               |  39 ++--
 29 files changed, 594 insertions(+), 803 deletions(-)
 delete mode 100644 cmd/opentelemetry/app/exporter/kafkaexporter/config.go
 delete mode 100644 cmd/opentelemetry/app/exporter/kafkaexporter/config_test.go
 delete mode 100644 cmd/opentelemetry/app/exporter/kafkaexporter/exporter.go
 delete mode 100644 cmd/opentelemetry/app/exporter/kafkaexporter/factory.go
 delete mode 100644 cmd/opentelemetry/app/exporter/kafkaexporter/factory_test.go
 rename cmd/opentelemetry/app/exporter/kafkaexporter/{doc.go => flags.go} (75%)
 create mode 100644 cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go
 create mode 100644 cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter_test.go
 delete mode 100644 cmd/opentelemetry/app/receiver/kafkareceiver/config.go
 delete mode 100644 cmd/opentelemetry/app/receiver/kafkareceiver/config_test.go
 delete mode 100644 cmd/opentelemetry/app/receiver/kafkareceiver/doc.go
 delete mode 100644 cmd/opentelemetry/app/receiver/kafkareceiver/factory.go
 delete mode 100644 cmd/opentelemetry/app/receiver/kafkareceiver/factory_test.go
 create mode 100644 cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go

diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go
index e4b5f5230269..98efb6eb4ce2 100644
--- a/cmd/ingester/app/flags.go
+++ b/cmd/ingester/app/flags.go
@@ -77,6 +77,20 @@ type Options struct {
 
 // AddFlags adds flags for Builder
 func AddFlags(flagSet *flag.FlagSet) {
+	flagSet.String(
+		ConfigPrefix+SuffixParallelism,
+		strconv.Itoa(DefaultParallelism),
+		"The number of messages to process in parallel")
+	flagSet.Duration(
+		ConfigPrefix+SuffixDeadlockInterval,
+		DefaultDeadlockInterval,
+		"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")
+	AddOTELFlags(flagSet)
+}
+
+// AddOTELFlags adds only OTEL flags
+func AddOTELFlags(flagSet *flag.FlagSet) {
+	// Authentication flags
 	flagSet.String(
 		KafkaConsumerConfigPrefix+SuffixBrokers,
 		DefaultBroker,
@@ -101,15 +115,6 @@ func AddFlags(flagSet *flag.FlagSet) {
 		KafkaConsumerConfigPrefix+SuffixEncoding,
 		DefaultEncoding,
 		fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \"")))
-	flagSet.String(
-		ConfigPrefix+SuffixParallelism,
-		strconv.Itoa(DefaultParallelism),
-		"The number of messages to process in parallel")
-	flagSet.Duration(
-		ConfigPrefix+SuffixDeadlockInterval,
-		DefaultDeadlockInterval,
-		"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")
-	// Authentication flags
 	auth.AddFlags(KafkaConsumerConfigPrefix, flagSet)
 }
 
diff --git a/cmd/opentelemetry/app/defaultcomponents/defaults.go b/cmd/opentelemetry/app/defaultcomponents/defaults.go
index 1fa72c65919a..70f7eba93525 100644
--- a/cmd/opentelemetry/app/defaultcomponents/defaults.go
+++ b/cmd/opentelemetry/app/defaultcomponents/defaults.go
@@ -21,12 +21,13 @@ import (
 	"github.com/spf13/viper"
 	"go.opentelemetry.io/collector/component"
 	otelJaegerExporter "go.opentelemetry.io/collector/exporter/jaegerexporter"
+	otelKafkaExporter "go.opentelemetry.io/collector/exporter/kafkaexporter"
 	otelResourceProcessor "go.opentelemetry.io/collector/processor/resourceprocessor"
 	otelJaegerReceiver "go.opentelemetry.io/collector/receiver/jaegerreceiver"
+	otelKafkaReceiver "go.opentelemetry.io/collector/receiver/kafkareceiver"
 	otelZipkinReceiver "go.opentelemetry.io/collector/receiver/zipkinreceiver"
 	"go.opentelemetry.io/collector/service/defaultcomponents"
 
-	ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
 	"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/badgerexporter"
 	"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/cassandraexporter"
 	"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter"
@@ -42,7 +43,6 @@ import (
 	cassandraStorage "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
 	esStorage "github.com/jaegertracing/jaeger/plugin/storage/es"
 	grpcStorage "github.com/jaegertracing/jaeger/plugin/storage/grpc"
-	kafkaStorage "github.com/jaegertracing/jaeger/plugin/storage/kafka"
 )
 
 // Components creates default and Jaeger factories
@@ -51,11 +51,6 @@ func Components(v *viper.Viper) component.Factories {
 	// We have to add all storage flags to viper because any exporter can be specified in the OTEL config file.
 	// OTEL collector creates default configurations for all factories to verify they can be created.
 	addDefaultValuesToViper(v)
-	kafkaExp := &kafkaexporter.Factory{OptionsFactory: func() *kafkaStorage.Options {
-		opts := kafkaexporter.DefaultOptions()
-		opts.InitFromViper(v)
-		return opts
-	}}
 	cassandraExp := &cassandraexporter.Factory{OptionsFactory: func() *cassandraStorage.Options {
 		opts := cassandraexporter.DefaultOptions()
 		opts.InitFromViper(v)
@@ -77,21 +72,22 @@ func Components(v *viper.Viper) component.Factories {
 		opts.InitFromViper(v)
 		return opts
 	})
-	kafkaRec := &kafkareceiver.Factory{OptionsFactory: func() *ingesterApp.Options {
-		opts := kafkareceiver.DefaultOptions()
-		opts.InitFromViper(v)
-		return opts
-	}}
 
 	factories, _ := defaultcomponents.Components()
-	factories.Exporters[kafkaExp.Type()] = kafkaExp
 	factories.Exporters[cassandraExp.Type()] = cassandraExp
 	factories.Exporters[esExp.Type()] = esExp
 	factories.Exporters[grpcExp.Type()] = grpcExp
 	factories.Exporters[memoryExp.Type()] = memoryExp
 	factories.Exporters[badgerExp.Type()] = badgerExp
-	factories.Receivers[kafkaRec.Type()] = kafkaRec
 
+	factories.Receivers[kafkareceiver.TypeStr] = &kafkareceiver.Factory{
+		Wrapped: otelKafkaReceiver.NewFactory(),
+		Viper:   v,
+	}
+	factories.Exporters[kafkaexporter.TypeStr] = &kafkaexporter.Factory{
+		Wrapped: otelKafkaExporter.NewFactory(),
+		Viper:   v,
+	}
 	factories.Receivers["jaeger"] = &jaegerreceiver.Factory{
 		Wrapped: otelJaegerReceiver.NewFactory(),
 		Viper:   v,
@@ -115,7 +111,8 @@ func Components(v *viper.Viper) component.Factories {
 // addDefaultValuesToViper adds Jaeger storage flags to viper to make the default values available.
 func addDefaultValuesToViper(v *viper.Viper) {
 	flagSet := &flag.FlagSet{}
-	kafkaexporter.DefaultOptions().AddFlags(flagSet)
+	kafkareceiver.AddFlags(flagSet)
+	kafkaexporter.AddFlags(flagSet)
 	elasticsearchexporter.DefaultOptions().AddFlags(flagSet)
 	cassandraexporter.DefaultOptions().AddFlags(flagSet)
 	pflagSet := &pflag.FlagSet{}
diff --git a/cmd/opentelemetry/app/defaultcomponents/defaults_test.go b/cmd/opentelemetry/app/defaultcomponents/defaults_test.go
index ab2c13a2497f..950dc782b514 100644
--- a/cmd/opentelemetry/app/defaultcomponents/defaults_test.go
+++ b/cmd/opentelemetry/app/defaultcomponents/defaults_test.go
@@ -34,7 +34,6 @@ import (
 
 func TestComponents(t *testing.T) {
 	v, _ := jConfig.Viperize(
-		kafkaexporter.DefaultOptions().AddFlags,
 		cassandraexporter.DefaultOptions().AddFlags,
 		elasticsearchexporter.DefaultOptions().AddFlags,
 	)
@@ -50,9 +49,6 @@ func TestComponents(t *testing.T) {
 	assert.IsType(t, &kafkareceiver.Factory{}, factories.Receivers[kafkareceiver.TypeStr])
 	assert.IsType(t, &zipkinreceiver.Factory{}, factories.Receivers["zipkin"])
 
-	kafkaFactory := factories.Exporters[kafkaexporter.TypeStr]
-	kc := kafkaFactory.CreateDefaultConfig().(*kafkaexporter.Config)
-	assert.Equal(t, []string{"127.0.0.1:9092"}, kc.Config.Brokers)
 	cassandraFactory := factories.Exporters[cassandraexporter.TypeStr]
 	cc := cassandraFactory.CreateDefaultConfig().(*cassandraexporter.Config)
 	assert.Equal(t, []string{"127.0.0.1"}, cc.Options.GetPrimary().Servers)
diff --git a/cmd/opentelemetry/app/defaultconfig/default_config.go b/cmd/opentelemetry/app/defaultconfig/default_config.go
index f907de03cfb9..b849066fa334 100644
--- a/cmd/opentelemetry/app/defaultconfig/default_config.go
+++ b/cmd/opentelemetry/app/defaultconfig/default_config.go
@@ -135,7 +135,7 @@ func createProcessors(factories component.Factories) (configmodels.Processors, [
 
 func createReceivers(component ComponentType, factories component.Factories) configmodels.Receivers {
 	if component == Ingester {
-		kafkaReceiver := factories.Receivers[kafkareceiver.TypeStr].CreateDefaultConfig().(*kafkareceiver.Config)
+		kafkaReceiver := factories.Receivers[kafkareceiver.TypeStr].CreateDefaultConfig()
 		return configmodels.Receivers{
 			kafkaReceiver.Name(): kafkaReceiver,
 		}
@@ -194,7 +194,7 @@ func createExporters(component ComponentType, storageTypes string, factories com
 			exporters[elasticsearchexporter.TypeStr] = es
 		case "kafka":
 			kaf := factories.Exporters[kafkaexporter.TypeStr].CreateDefaultConfig()
-			exporters[kafkaexporter.TypeStr] = kaf
+			exporters["kafka"] = kaf
 		case "grpc-plugin":
 			grpcEx := factories.Exporters[grpcpluginexporter.TypeStr].CreateDefaultConfig()
 			exporters[grpcpluginexporter.TypeStr] = grpcEx
diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/config.go b/cmd/opentelemetry/app/exporter/kafkaexporter/config.go
deleted file mode 100644
index 1664c31d7a99..000000000000
--- a/cmd/opentelemetry/app/exporter/kafkaexporter/config.go
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright (c) 2020 The Jaeger Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package kafkaexporter
-
-import (
-	"go.opentelemetry.io/collector/config/configmodels"
-
-	"github.com/jaegertracing/jaeger/plugin/storage/kafka"
-)
-
-// Config hold configuration of Jaeger Kafka exporter/storage.
-type Config struct {
-	configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
-	kafka.Options                 `mapstructure:",squash"`
-}
diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/config_test.go b/cmd/opentelemetry/app/exporter/kafkaexporter/config_test.go
deleted file mode 100644
index bc11f0db2e5c..000000000000
--- a/cmd/opentelemetry/app/exporter/kafkaexporter/config_test.go
+++ /dev/null
@@ -1,94 +0,0 @@
-// Copyright (c) 2020 The Jaeger Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package kafkaexporter
-
-import (
-	"path"
-	"testing"
-
-	"github.com/Shopify/sarama"
-	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
-	"go.opentelemetry.io/collector/component/componenttest"
-	"go.opentelemetry.io/collector/config/configcheck"
-	"go.opentelemetry.io/collector/config/configtest"
-
-	"github.com/jaegertracing/jaeger/cmd/flags"
-	jConfig "github.com/jaegertracing/jaeger/pkg/config"
-	"github.com/jaegertracing/jaeger/plugin/storage/kafka"
-)
-
-func TestDefaultConfig(t *testing.T) {
-	v, c := jConfig.Viperize(DefaultOptions().AddFlags)
-	err := c.ParseFlags([]string{""})
-	require.NoError(t, err)
-	factory := &Factory{OptionsFactory: func() *kafka.Options {
-		opts := DefaultOptions()
-		opts.InitFromViper(v)
-		return opts
-	}}
-	defaultCfg := factory.CreateDefaultConfig().(*Config)
-	assert.NoError(t, configcheck.ValidateConfig(defaultCfg))
-	assert.Equal(t, "jaeger-spans", defaultCfg.Topic)
-	assert.Equal(t, "protobuf", defaultCfg.Encoding)
-	assert.Equal(t, []string{"127.0.0.1:9092"}, defaultCfg.Config.Brokers)
-	assert.Equal(t, sarama.WaitForLocal, defaultCfg.Config.RequiredAcks)
-	assert.Equal(t, "none", defaultCfg.Config.Authentication)
-	assert.Equal(t, "/etc/krb5.conf", defaultCfg.Config.Kerberos.ConfigPath)
-	assert.Equal(t, "kafka", defaultCfg.Config.Kerberos.ServiceName)
-	assert.Equal(t, false, defaultCfg.Config.TLS.Enabled)
-}
-
-func TestLoadConfigAndFlags(t *testing.T) {
-	factories, err := componenttest.ExampleComponents()
-	require.NoError(t, err)
-
-	v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag)
-	err = c.ParseFlags([]string{"--config-file=./testdata/jaeger-config.yaml", "--kafka.producer.topic=jaeger-test", "--kafka.producer.brokers=host1,host2"})
-	require.NoError(t, err)
-
-	err = flags.TryLoadConfigFile(v)
-	require.NoError(t, err)
-
-	factory := &Factory{OptionsFactory: func() *kafka.Options {
-		opts := DefaultOptions()
-		opts.InitFromViper(v)
-		assert.Equal(t, "jaeger-test", opts.Topic)
-		assert.Equal(t, []string{"host1", "host2"}, opts.Config.Brokers)
-		return opts
-	}}
-
-	factories.Exporters[TypeStr] = factory
-	cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
-	require.NoError(t, err)
-	require.NotNil(t, cfg)
-
-	kafkaCfg := cfg.Exporters[TypeStr].(*Config)
-	assert.Equal(t, TypeStr, kafkaCfg.Name())
-	assert.Equal(t, "jaeger-prod", kafkaCfg.Topic)
-	assert.Equal(t, "emojis", kafkaCfg.Encoding)
-	assert.Equal(t, []string{"foo", "bar"}, kafkaCfg.Config.Brokers)
-	assert.Equal(t, "tls", kafkaCfg.Config.Authentication)
-	assert.Equal(t, "user", kafkaCfg.Config.PlainText.UserName)
-	assert.Equal(t, "123", kafkaCfg.Config.PlainText.Password)
-	assert.Equal(t, true, kafkaCfg.Config.TLS.Enabled)
-	assert.Equal(t, "ca.crt", kafkaCfg.Config.TLS.CAPath)
-	assert.Equal(t, "key.crt", kafkaCfg.Config.TLS.KeyPath)
-	assert.Equal(t, "cert.crt", kafkaCfg.Config.TLS.CertPath)
-	assert.Equal(t, true, kafkaCfg.Config.TLS.SkipHostVerify)
-	assert.Equal(t, "jaeger", kafkaCfg.Config.Kerberos.Realm)
-	assert.Equal(t, "/etc/foo", kafkaCfg.Config.Kerberos.ConfigPath)
-	assert.Equal(t, "from-jaeger-config", kafkaCfg.Config.Kerberos.Username)
-}
diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/exporter.go b/cmd/opentelemetry/app/exporter/kafkaexporter/exporter.go
deleted file mode 100644
index 92f06ec53a4c..000000000000
--- a/cmd/opentelemetry/app/exporter/kafkaexporter/exporter.go
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright (c) 2020 The Jaeger Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package kafkaexporter
-
-import (
-	"github.com/uber/jaeger-lib/metrics"
-	"go.opentelemetry.io/collector/component"
-
-	storageOtelExporter "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter"
-	"github.com/jaegertracing/jaeger/plugin/storage/kafka"
-)
-
-// new creates new Kafka exporter
-func new(config *Config, params component.ExporterCreateParams) (component.TraceExporter, error) {
-	f := kafka.NewFactory()
-	f.InitFromOptions(config.Options)
-	err := f.Initialize(metrics.NullFactory, params.Logger)
-	if err != nil {
-		return nil, err
-	}
-	return storageOtelExporter.NewSpanWriterExporter(config, f)
-}
diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/factory.go b/cmd/opentelemetry/app/exporter/kafkaexporter/factory.go
deleted file mode 100644
index 9547f9db1a53..000000000000
--- a/cmd/opentelemetry/app/exporter/kafkaexporter/factory.go
+++ /dev/null
@@ -1,96 +0,0 @@
-// Copyright (c) 2020 The Jaeger Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package kafkaexporter
-
-import (
-	"context"
-	"fmt"
-
-	"go.opentelemetry.io/collector/component"
-	"go.opentelemetry.io/collector/config/configerror"
-	"go.opentelemetry.io/collector/config/configmodels"
-
-	"github.com/jaegertracing/jaeger/plugin/storage/kafka"
-)
-
-// TypeStr defines exporter type.
-const TypeStr = "jaeger_kafka"
-
-// OptionsFactory returns initialized kafka.Options structure.
-type OptionsFactory func() *kafka.Options
-
-// DefaultOptions creates Kafka options supported by this exporter.
-func DefaultOptions() *kafka.Options {
-	return &kafka.Options{}
-}
-
-// Factory is the factory for Jaeger Kafka exporter.
-type Factory struct {
-	OptionsFactory OptionsFactory
-}
-
-var _ component.ExporterFactory = (*Factory)(nil)
-
-// Type gets the type of exporter.
-func (Factory) Type() configmodels.Type {
-	return TypeStr
-}
-
-// CreateDefaultConfig returns default configuration of Factory.
-// This function implements OTEL component.ExporterFactoryBase interface.
-func (f Factory) CreateDefaultConfig() configmodels.Exporter {
-	opts := f.OptionsFactory()
-	return &Config{
-		Options: *opts,
-		ExporterSettings: configmodels.ExporterSettings{
-			TypeVal: TypeStr,
-			NameVal: TypeStr,
-		},
-	}
-}
-
-// CreateTraceExporter creates Jaeger Kafka trace exporter.
-// This function implements OTEL component.ExporterFactory interface.
-func (Factory) CreateTraceExporter(
-	_ context.Context,
-	params component.ExporterCreateParams,
-	cfg configmodels.Exporter,
-) (component.TraceExporter, error) {
-	kafkaCfg, ok := cfg.(*Config)
-	if !ok {
-		return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
-	}
-	return new(kafkaCfg, params)
-}
-
-// CreateMetricsExporter is not implemented.
-// This function implements OTEL component.Factory interface.
-func (Factory) CreateMetricsExporter(
-	_ context.Context,
-	_ component.ExporterCreateParams,
-	_ configmodels.Exporter,
-) (component.MetricsExporter, error) {
-	return nil, configerror.ErrDataTypeIsNotSupported
-}
-
-// CreateLogsExporter creates a metrics exporter based on provided config.
-// This function implements component.ExporterFactory.
-func (f Factory) CreateLogsExporter(
-	ctx context.Context,
-	params component.ExporterCreateParams,
-	cfg configmodels.Exporter,
-) (component.LogsExporter, error) {
-	return nil, configerror.ErrDataTypeIsNotSupported
-}
diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/factory_test.go b/cmd/opentelemetry/app/exporter/kafkaexporter/factory_test.go
deleted file mode 100644
index 7940d0b2afd5..000000000000
--- a/cmd/opentelemetry/app/exporter/kafkaexporter/factory_test.go
+++ /dev/null
@@ -1,69 +0,0 @@
-// Copyright (c) 2020 The Jaeger Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package kafkaexporter
-
-import (
-	"context"
-	"testing"
-
-	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
-	"go.opentelemetry.io/collector/component"
-	"go.opentelemetry.io/collector/config/configcheck"
-	"go.opentelemetry.io/collector/config/configerror"
-	"go.opentelemetry.io/collector/config/configmodels"
-	"go.uber.org/zap"
-
-	jConfig "github.com/jaegertracing/jaeger/pkg/config"
-	"github.com/jaegertracing/jaeger/plugin/storage/kafka"
-)
-
-func TestCreateTraceExporter(t *testing.T) {
-	v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
-	opts := DefaultOptions()
-	opts.InitFromViper(v)
-	factory := &Factory{OptionsFactory: func() *kafka.Options {
-		return opts
-	}}
-	exporter, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, factory.CreateDefaultConfig())
-	require.Nil(t, exporter)
-	assert.Contains(t, err.Error(), "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")
-}
-
-func TestCreateTraceExporter_nilConfig(t *testing.T) {
-	factory := &Factory{}
-	exporter, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, nil)
-	require.Nil(t, exporter)
-	assert.Contains(t, err.Error(), "could not cast configuration to jaeger_kafka")
-}
-
-func TestCreateMetricsExporter(t *testing.T) {
-	f := Factory{OptionsFactory: DefaultOptions}
-	mReceiver, err := f.CreateMetricsExporter(context.Background(), component.ExporterCreateParams{}, f.CreateDefaultConfig())
-	assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
-	assert.Nil(t, mReceiver)
-}
-
-func TestCreateDefaultConfig(t *testing.T) {
-	factory := Factory{OptionsFactory: DefaultOptions}
-	cfg := factory.CreateDefaultConfig()
-	assert.NotNil(t, cfg, "failed to create default config")
-	assert.NoError(t, configcheck.ValidateConfig(cfg))
-}
-
-func TestType(t *testing.T) {
-	factory := Factory{OptionsFactory: DefaultOptions}
-	assert.Equal(t, configmodels.Type(TypeStr), factory.Type())
-}
diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/doc.go b/cmd/opentelemetry/app/exporter/kafkaexporter/flags.go
similarity index 75%
rename from cmd/opentelemetry/app/exporter/kafkaexporter/doc.go
rename to cmd/opentelemetry/app/exporter/kafkaexporter/flags.go
index 698e7f389a27..179d7b6e01a3 100644
--- a/cmd/opentelemetry/app/exporter/kafkaexporter/doc.go
+++ b/cmd/opentelemetry/app/exporter/kafkaexporter/flags.go
@@ -12,5 +12,16 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package kafkaexporter implements Jaeger Kafka producer as OpenTelemetry exporter.
 package kafkaexporter
+
+import (
+	"flag"
+
+	"github.com/jaegertracing/jaeger/plugin/storage/kafka"
+)
+
+// AddFlags adds Ingester flags.
+func AddFlags(flags *flag.FlagSet) {
+	opts := &kafka.Options{}
+	opts.AddOTELFlags(flags)
+}
diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go
new file mode 100644
index 000000000000..e8f41602ef32
--- /dev/null
+++ b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go
@@ -0,0 +1,129 @@
+// Copyright (c) 2020 The Jaeger Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kafkaexporter
+
+import (
+	"context"
+
+	"github.com/spf13/viper"
+	"go.opentelemetry.io/collector/component"
+	"go.opentelemetry.io/collector/config/configmodels"
+	"go.opentelemetry.io/collector/config/configtls"
+	"go.opentelemetry.io/collector/exporter/kafkaexporter"
+
+	"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/receiver/kafkareceiver"
+	"github.com/jaegertracing/jaeger/plugin/storage/kafka"
+)
+
+// TypeStr defines exporter type.
+const TypeStr = "kafka"
+
+// Factory wraps kafkaexporter.Factory and makes the default config configurable via viper.
+// For instance this enables using flags as default values in the config object.
+type Factory struct {
+	// Wrapped is kafka exporter
+	Wrapped component.ExporterFactory
+	// Viper is used to get configuration values for default configuration
+	Viper *viper.Viper
+}
+
+var _ component.ExporterFactory = (*Factory)(nil)
+
+// Type returns the type of the exporter.
+func (f Factory) Type() configmodels.Type {
+	return f.Wrapped.Type()
+}
+
+// CreateDefaultConfig returns default configuration of Factory.
+// This function implements OTEL component.ExporterFactoryBase interface.
+func (f Factory) CreateDefaultConfig() configmodels.Exporter {
+	cfg := f.Wrapped.CreateDefaultConfig().(*kafkaexporter.Config)
+
+	// InitFromViper fails if certain fields are not set.  Setting them here
+	//  to prevent the process from exiting.
+	f.Viper.Set("kafka.producer.required-acks", "local")
+	f.Viper.Set("kafka.producer.compression", "none")
+
+	opts := &kafka.Options{}
+	opts.InitFromViper(f.Viper)
+
+	cfg.Encoding = kafkareceiver.MustOtelEncodingForJaegerEncoding(opts.Encoding)
+	cfg.Topic = opts.Topic
+	cfg.Brokers = opts.Config.Brokers
+	cfg.ProtocolVersion = opts.Config.ProtocolVersion
+
+	if opts.Config.Authentication == "kerberos" {
+		cfg.Authentication.Kerberos = &kafkaexporter.KerberosConfig{
+			ServiceName: opts.Config.Kerberos.ServiceName,
+			Realm:       opts.Config.Kerberos.Realm,
+			UseKeyTab:   opts.Config.Kerberos.UseKeyTab,
+			Username:    opts.Config.Kerberos.Username,
+			Password:    opts.Config.Kerberos.Password,
+			ConfigPath:  opts.Config.Kerberos.ConfigPath,
+			KeyTabPath:  opts.Config.Kerberos.KeyTabPath,
+		}
+	}
+
+	if opts.Config.Authentication == "plaintext" {
+		cfg.Authentication.PlainText = &kafkaexporter.PlainTextConfig{
+			Username: opts.Config.PlainText.UserName,
+			Password: opts.Config.PlainText.Password,
+		}
+	}
+
+	if opts.Config.Authentication == "tls" && opts.Config.TLS.Enabled {
+		cfg.Authentication.TLS = &configtls.TLSClientSetting{
+			TLSSetting: configtls.TLSSetting{
+				CAFile:   opts.Config.TLS.CAPath,
+				CertFile: opts.Config.TLS.CertPath,
+				KeyFile:  opts.Config.TLS.KeyPath,
+			},
+			ServerName: opts.Config.TLS.ServerName,
+			Insecure:   opts.Config.TLS.SkipHostVerify,
+		}
+	}
+
+	return cfg
+}
+
+// CreateTraceExporter creates Jaeger trace exporter.
+// This function implements OTEL component.ExporterFactory interface.
+func (f Factory) CreateTraceExporter(
+	ctx context.Context,
+	params component.ExporterCreateParams,
+	cfg configmodels.Exporter,
+) (component.TraceExporter, error) {
+	return f.Wrapped.CreateTraceExporter(ctx, params, cfg)
+}
+
+// CreateMetricsExporter creates a metrics exporter based on provided config.
+// This function implements component.ExporterFactory.
+func (f Factory) CreateMetricsExporter(
+	ctx context.Context,
+	params component.ExporterCreateParams,
+	cfg configmodels.Exporter,
+) (component.MetricsExporter, error) {
+	return f.Wrapped.CreateMetricsExporter(ctx, params, cfg)
+}
+
+// CreateLogsExporter creates a metrics exporter based on provided config.
+// This function implements component.ExporterFactory.
+func (f Factory) CreateLogsExporter(
+	ctx context.Context,
+	params component.ExporterCreateParams,
+	cfg configmodels.Exporter,
+) (component.LogsExporter, error) {
+	return f.Wrapped.CreateLogsExporter(ctx, params, cfg)
+}
diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter_test.go b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter_test.go
new file mode 100644
index 000000000000..df36331786a5
--- /dev/null
+++ b/cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter_test.go
@@ -0,0 +1,88 @@
+// Copyright (c) 2020 The Jaeger Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kafkaexporter
+
+import (
+	"path"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	"go.opentelemetry.io/collector/component/componenttest"
+	"go.opentelemetry.io/collector/config/configcheck"
+	"go.opentelemetry.io/collector/config/configtest"
+	otelKafkaExporter "go.opentelemetry.io/collector/exporter/kafkaexporter"
+
+	"github.com/jaegertracing/jaeger/cmd/flags"
+	jConfig "github.com/jaegertracing/jaeger/pkg/config"
+)
+
+func TestDefaultConfig(t *testing.T) {
+	v, c := jConfig.Viperize(AddFlags)
+	err := c.ParseFlags([]string{""})
+	require.NoError(t, err)
+
+	factory := &Factory{
+		Wrapped: otelKafkaExporter.NewFactory(),
+		Viper:   v,
+	}
+	defaultCfg := factory.CreateDefaultConfig().(*otelKafkaExporter.Config)
+
+	assert.NoError(t, configcheck.ValidateConfig(defaultCfg))
+	assert.Equal(t, "jaeger-spans", defaultCfg.Topic)
+	assert.Equal(t, "jaeger_proto", defaultCfg.Encoding)
+	assert.Equal(t, "", defaultCfg.ProtocolVersion)
+	assert.Equal(t, []string{"127.0.0.1:9092"}, defaultCfg.Brokers)
+	assert.Nil(t, defaultCfg.Authentication.Kerberos)
+	assert.Nil(t, defaultCfg.Authentication.TLS)
+	assert.Nil(t, defaultCfg.Authentication.PlainText)
+}
+
+func TestLoadConfigAndFlags(t *testing.T) {
+	v, c := jConfig.Viperize(AddFlags, flags.AddConfigFileFlag)
+	err := c.ParseFlags([]string{"--config-file=./testdata/jaeger-config.yaml", "--kafka.producer.topic=jaeger-test", "--kafka.producer.brokers=host1,host2"})
+	require.NoError(t, err)
+
+	err = flags.TryLoadConfigFile(v)
+	require.NoError(t, err)
+
+	factory := &Factory{
+		Wrapped: otelKafkaExporter.NewFactory(),
+		Viper:   v,
+	}
+
+	factories, err := componenttest.ExampleComponents()
+	factories.Exporters[TypeStr] = factory
+	cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
+	require.NoError(t, err)
+	require.NotNil(t, cfg)
+
+	kafkaCfg := cfg.Exporters[TypeStr].(*otelKafkaExporter.Config)
+	require.NotNil(t, kafkaCfg)
+
+	assert.Equal(t, TypeStr, kafkaCfg.Name())
+	assert.Equal(t, "jaeger-prod", kafkaCfg.Topic)
+	assert.Equal(t, "emojis", kafkaCfg.Encoding)
+	assert.Equal(t, []string{"foo", "bar"}, kafkaCfg.Brokers)
+	assert.Equal(t, "user", kafkaCfg.Authentication.PlainText.Username)
+	assert.Equal(t, "123", kafkaCfg.Authentication.PlainText.Password)
+	assert.Equal(t, "ca.crt", kafkaCfg.Authentication.TLS.CAFile)
+	assert.Equal(t, "key.crt", kafkaCfg.Authentication.TLS.KeyFile)
+	assert.Equal(t, "cert.crt", kafkaCfg.Authentication.TLS.CertFile)
+	assert.Equal(t, true, kafkaCfg.Authentication.TLS.Insecure)
+	assert.Equal(t, "jaeger", kafkaCfg.Authentication.Kerberos.Realm)
+	assert.Equal(t, "/etc/foo", kafkaCfg.Authentication.Kerberos.ConfigPath)
+	assert.Equal(t, "from-jaeger-config", kafkaCfg.Authentication.Kerberos.Username)
+}
diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/testdata/config.yaml b/cmd/opentelemetry/app/exporter/kafkaexporter/testdata/config.yaml
index 1ed1a58f895d..b0a712554f88 100644
--- a/cmd/opentelemetry/app/exporter/kafkaexporter/testdata/config.yaml
+++ b/cmd/opentelemetry/app/exporter/kafkaexporter/testdata/config.yaml
@@ -1,33 +1,31 @@
-receivers:
-  examplereceiver:
+receivers:	
+  examplereceiver:	
 
-processors:
-  exampleprocessor:
+processors:	
+  exampleprocessor:	
 
-exporters:
-  jaeger_kafka:
-    topic: jaeger-prod
-    encoding: emojis
-    brokers: foo,bar
-    authentication:
-      type: tls
-      plaintext:
-        username: user
-        password: 123
-      tls:
-        enabled: true
-        ca: ca.crt
-        key: key.crt
-        cert: cert.crt
-        skip_host_verify: true
-      kerberos:
-        realm: jaeger
-        config_file: /etc/foo
+exporters:	
+  kafka:	
+    topic: jaeger-prod	
+    encoding: emojis	
+    brokers: foo,bar	
+    auth:	
+      plain_text:	
+        username: user	
+        password: 123	
+      tls:	
+        ca_file: ca.crt	
+        key_file: key.crt	
+        cert_file: cert.crt	
+        insecure: true	
+      kerberos:	
+        realm: jaeger	
+        config_file: /etc/foo	
 
 
-service:
-  pipelines:
-    traces:
-      receivers: [examplereceiver]
-      processors: [exampleprocessor]
-      exporters: [jaeger_kafka]
+service:	
+  pipelines:	
+    traces:	
+      receivers: [examplereceiver]	
+      processors: [exampleprocessor]	
+      exporters: [kafka]
\ No newline at end of file
diff --git a/cmd/opentelemetry/app/exporter/kafkaexporter/testdata/jaeger-config.yaml b/cmd/opentelemetry/app/exporter/kafkaexporter/testdata/jaeger-config.yaml
index 99563d92dd1d..9e0eabd604c5 100644
--- a/cmd/opentelemetry/app/exporter/kafkaexporter/testdata/jaeger-config.yaml
+++ b/cmd/opentelemetry/app/exporter/kafkaexporter/testdata/jaeger-config.yaml
@@ -1,3 +1,7 @@
 kafka.producer:
+  authentication: kerberos
+  encoding: protobuf
+  topic: jaeger-prod
   kerberos:
     username: from-jaeger-config
+    realm: jaeger
diff --git a/cmd/opentelemetry/app/flags.go b/cmd/opentelemetry/app/flags.go
index 957bca7ea822..3573cf00faac 100644
--- a/cmd/opentelemetry/app/flags.go
+++ b/cmd/opentelemetry/app/flags.go
@@ -63,7 +63,7 @@ func AddStorageFlags(storage string, enableArchive bool) (func(*flag.FlagSet), e
 				flagFn = append(flagFn, esStorage.NewOptions("es-archive").AddFlags)
 			}
 		case "kafka":
-			flagFn = append(flagFn, kafkaexporter.DefaultOptions().AddFlags)
+			flagFn = append(flagFn, kafkaexporter.AddFlags)
 		case "grpc-plugin":
 			flagFn = append(flagFn, grpcpluginexporter.DefaultOptions().AddFlags)
 		default:
diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/config.go b/cmd/opentelemetry/app/receiver/kafkareceiver/config.go
deleted file mode 100644
index 94035fe18629..000000000000
--- a/cmd/opentelemetry/app/receiver/kafkareceiver/config.go
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright (c) 2020 The Jaeger Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package kafkareceiver
-
-import (
-	"go.opentelemetry.io/collector/config/configmodels"
-
-	ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
-)
-
-// Config hold configuration for Jaeger kafka receiver/ingester.
-type Config struct {
-	configmodels.ReceiverSettings `mapstructure:",squash"`
-	ingesterApp.Options           `mapstructure:",squash"`
-}
diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/config_test.go b/cmd/opentelemetry/app/receiver/kafkareceiver/config_test.go
deleted file mode 100644
index b84b725a45ce..000000000000
--- a/cmd/opentelemetry/app/receiver/kafkareceiver/config_test.go
+++ /dev/null
@@ -1,92 +0,0 @@
-// Copyright (c) 2020 The Jaeger Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package kafkareceiver
-
-import (
-	"path"
-	"testing"
-
-	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
-	"go.opentelemetry.io/collector/component/componenttest"
-	"go.opentelemetry.io/collector/config/configcheck"
-	"go.opentelemetry.io/collector/config/configtest"
-
-	"github.com/jaegertracing/jaeger/cmd/flags"
-	"github.com/jaegertracing/jaeger/cmd/ingester/app"
-	jConfig "github.com/jaegertracing/jaeger/pkg/config"
-)
-
-func TestDefaultConfig(t *testing.T) {
-	v, c := jConfig.Viperize(app.AddFlags)
-	err := c.ParseFlags([]string{""})
-	require.NoError(t, err)
-	factory := &Factory{OptionsFactory: func() *app.Options {
-		opts := DefaultOptions()
-		opts.InitFromViper(v)
-		return opts
-	}}
-	defaultCfg := factory.CreateDefaultConfig().(*Config)
-	assert.NoError(t, configcheck.ValidateConfig(defaultCfg))
-	assert.Equal(t, "jaeger-spans", defaultCfg.Topic)
-	assert.Equal(t, "protobuf", defaultCfg.Encoding)
-	assert.Equal(t, []string{"127.0.0.1:9092"}, defaultCfg.Brokers)
-	assert.Equal(t, "none", defaultCfg.Authentication)
-	assert.Equal(t, "/etc/krb5.conf", defaultCfg.Kerberos.ConfigPath)
-	assert.Equal(t, "kafka", defaultCfg.Kerberos.ServiceName)
-	assert.Equal(t, false, defaultCfg.TLS.Enabled)
-}
-
-func TestLoadConfigAndFlags(t *testing.T) {
-	factories, err := componenttest.ExampleComponents()
-	require.NoError(t, err)
-
-	v, c := jConfig.Viperize(app.AddFlags, flags.AddConfigFileFlag)
-	err = c.ParseFlags([]string{"--config-file=./testdata/jaeger-config.yaml", "--kafka.consumer.topic=jaeger-test", "--kafka.consumer.brokers=host1,host2", "--kafka.consumer.tls.cert=from-flag"})
-	require.NoError(t, err)
-
-	err = flags.TryLoadConfigFile(v)
-	require.NoError(t, err)
-
-	factory := &Factory{OptionsFactory: func() *app.Options {
-		opts := DefaultOptions()
-		opts.InitFromViper(v)
-		assert.Equal(t, "jaeger-test", opts.Topic)
-		assert.Equal(t, []string{"host1", "host2"}, opts.Brokers)
-		return opts
-	}}
-
-	factories.Receivers[TypeStr] = factory
-	cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
-	require.NoError(t, err)
-	require.NotNil(t, cfg)
-
-	kafkaCfg := cfg.Receivers[TypeStr].(*Config)
-	assert.Equal(t, TypeStr, kafkaCfg.Name())
-	assert.Equal(t, "jaeger-prod", kafkaCfg.Topic)
-	assert.Equal(t, "emojis", kafkaCfg.Encoding)
-	assert.Equal(t, []string{"foo", "bar"}, kafkaCfg.Options.Brokers)
-	assert.Equal(t, "tls", kafkaCfg.Options.Authentication)
-	assert.Equal(t, "user", kafkaCfg.Options.PlainText.UserName)
-	assert.Equal(t, "123", kafkaCfg.Options.PlainText.Password)
-	assert.Equal(t, true, kafkaCfg.Options.TLS.Enabled)
-	assert.Equal(t, "ca.crt", kafkaCfg.Options.TLS.CAPath)
-	assert.Equal(t, "key.crt", kafkaCfg.Options.TLS.KeyPath)
-	assert.Equal(t, "from-flag", kafkaCfg.Options.TLS.CertPath)
-	assert.Equal(t, true, kafkaCfg.Options.TLS.SkipHostVerify)
-	assert.Equal(t, "jaeger", kafkaCfg.Options.Kerberos.Realm)
-	assert.Equal(t, "/etc/foo", kafkaCfg.Options.Kerberos.ConfigPath)
-	assert.Equal(t, "from-jaeger-config", kafkaCfg.Options.Kerberos.Username)
-}
diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/doc.go b/cmd/opentelemetry/app/receiver/kafkareceiver/doc.go
deleted file mode 100644
index 88cbd83e37ba..000000000000
--- a/cmd/opentelemetry/app/receiver/kafkareceiver/doc.go
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright (c) 2020 The Jaeger Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package kafkareceiver implements Jaeger Kafka consumer as OpenTelemetry receiver.
-package kafkareceiver
diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/factory.go b/cmd/opentelemetry/app/receiver/kafkareceiver/factory.go
deleted file mode 100644
index ba65667c62e9..000000000000
--- a/cmd/opentelemetry/app/receiver/kafkareceiver/factory.go
+++ /dev/null
@@ -1,107 +0,0 @@
-// Copyright (c) 2020 The Jaeger Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package kafkareceiver
-
-import (
-	"context"
-	"fmt"
-
-	"go.opentelemetry.io/collector/component"
-	"go.opentelemetry.io/collector/config/configerror"
-	"go.opentelemetry.io/collector/config/configmodels"
-	"go.opentelemetry.io/collector/consumer"
-
-	ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
-)
-
-// TypeStr defines exporter type.
-const TypeStr = "jaeger_kafka"
-
-// OptionsFactory returns initialized ingester app.Options structure.
-type OptionsFactory func() *ingesterApp.Options
-
-// DefaultOptions creates Kafka options supported by this receiver.
-func DefaultOptions() *ingesterApp.Options {
-	return &ingesterApp.Options{}
-}
-
-// Factory is used to build the receiver.
-type Factory struct {
-	OptionsFactory OptionsFactory
-}
-
-var _ component.ReceiverFactory = (*Factory)(nil)
-
-// Type returns the receiver type.
-func (f Factory) Type() configmodels.Type {
-	return TypeStr
-}
-
-// CreateDefaultConfig creates default config.
-// This function implements OTEL component.ReceiverFactoryBase interface.
-func (f Factory) CreateDefaultConfig() configmodels.Receiver {
-	opts := f.OptionsFactory()
-	return &Config{
-		Options: *opts,
-		ReceiverSettings: configmodels.ReceiverSettings{
-			TypeVal: TypeStr,
-			NameVal: TypeStr,
-		},
-	}
-}
-
-// CustomUnmarshaler returns custom marshaller.
-// This function implements OTEL component.ReceiverFactoryBase interface.
-func (f Factory) CustomUnmarshaler() component.CustomUnmarshaler {
-	return nil
-}
-
-// CreateTraceReceiver returns Kafka receiver.
-// This function implements OTEL component.ReceiverFactory.
-func (f Factory) CreateTraceReceiver(
-	_ context.Context,
-	params component.ReceiverCreateParams,
-	cfg configmodels.Receiver,
-	nextConsumer consumer.TraceConsumer,
-) (component.TraceReceiver, error) {
-	kafkaCfg, ok := cfg.(*Config)
-	if !ok {
-		return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
-	}
-	return new(kafkaCfg, nextConsumer, params)
-}
-
-// CreateMetricsReceiver returns metrics receiver.
-// This function implements OTEL component.ReceiverFactory.
-func (f Factory) CreateMetricsReceiver(
-	_ context.Context,
-	_ component.ReceiverCreateParams,
-	_ configmodels.Receiver,
-	_ consumer.MetricsConsumer,
-) (component.MetricsReceiver, error) {
-	return nil, configerror.ErrDataTypeIsNotSupported
-}
-
-// CreateLogsReceiver creates a receiver based on the config.
-// If the receiver type does not support logs or if the config is not valid
-// error will be returned instead.
-func (f Factory) CreateLogsReceiver(
-	ctx context.Context,
-	params component.ReceiverCreateParams,
-	cfg configmodels.Receiver,
-	nextConsumer consumer.LogsConsumer,
-) (component.LogsReceiver, error) {
-	return nil, configerror.ErrDataTypeIsNotSupported
-}
diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/factory_test.go b/cmd/opentelemetry/app/receiver/kafkareceiver/factory_test.go
deleted file mode 100644
index 7fb857e3a1ff..000000000000
--- a/cmd/opentelemetry/app/receiver/kafkareceiver/factory_test.go
+++ /dev/null
@@ -1,69 +0,0 @@
-// Copyright (c) 2020 The Jaeger Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package kafkareceiver
-
-import (
-	"context"
-	"testing"
-
-	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
-	"go.opentelemetry.io/collector/component"
-	"go.opentelemetry.io/collector/config/configcheck"
-	"go.opentelemetry.io/collector/config/configerror"
-	"go.opentelemetry.io/collector/config/configmodels"
-	"go.uber.org/zap"
-
-	ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
-	jConfig "github.com/jaegertracing/jaeger/pkg/config"
-)
-
-func TestCreateTraceReceiver(t *testing.T) {
-	v, _ := jConfig.Viperize(ingesterApp.AddFlags)
-	opts := DefaultOptions()
-	opts.InitFromViper(v)
-	factory := &Factory{OptionsFactory: func() *ingesterApp.Options {
-		return opts
-	}}
-	exporter, err := factory.CreateTraceReceiver(context.Background(), component.ReceiverCreateParams{Logger: zap.NewNop()}, factory.CreateDefaultConfig(), nil)
-	require.Nil(t, exporter)
-	assert.Contains(t, err.Error(), "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")
-}
-
-func TestCreateTraceExporter_nilConfig(t *testing.T) {
-	factory := &Factory{}
-	exporter, err := factory.CreateTraceReceiver(context.Background(), component.ReceiverCreateParams{}, nil, nil)
-	require.Nil(t, exporter)
-	assert.Contains(t, err.Error(), "could not cast configuration to jaeger_kafka")
-}
-
-func TestCreateMetricsExporter(t *testing.T) {
-	f := Factory{OptionsFactory: DefaultOptions}
-	mReceiver, err := f.CreateMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, f.CreateDefaultConfig(), nil)
-	assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
-	assert.Nil(t, mReceiver)
-}
-
-func TestCreateDefaultConfig(t *testing.T) {
-	factory := Factory{OptionsFactory: DefaultOptions}
-	cfg := factory.CreateDefaultConfig()
-	assert.NotNil(t, cfg, "failed to create default config")
-	assert.NoError(t, configcheck.ValidateConfig(cfg))
-}
-
-func TestType(t *testing.T) {
-	factory := Factory{OptionsFactory: DefaultOptions}
-	assert.Equal(t, configmodels.Type(TypeStr), factory.Type())
-}
diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/flags.go b/cmd/opentelemetry/app/receiver/kafkareceiver/flags.go
index df7860496f31..91be467e2173 100644
--- a/cmd/opentelemetry/app/receiver/kafkareceiver/flags.go
+++ b/cmd/opentelemetry/app/receiver/kafkareceiver/flags.go
@@ -22,5 +22,5 @@ import (
 
 // AddFlags adds Ingester flags.
 func AddFlags(flags *flag.FlagSet) {
-	ingesterApp.AddFlags(flags)
+	ingesterApp.AddOTELFlags(flags)
 }
diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go
index c698ef3b6625..3b4a3a2fd4ff 100644
--- a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go
+++ b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go
@@ -17,79 +17,135 @@ package kafkareceiver
 import (
 	"context"
 
-	"github.com/uber/jaeger-lib/metrics"
+	"github.com/Shopify/sarama"
+	"github.com/spf13/viper"
 	"go.opentelemetry.io/collector/component"
+	"go.opentelemetry.io/collector/config/configmodels"
+	"go.opentelemetry.io/collector/config/configtls"
 	"go.opentelemetry.io/collector/consumer"
-	"go.opentelemetry.io/collector/obsreport"
-	jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
-	"go.uber.org/zap"
-
-	"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
-	ingester "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer"
-	"github.com/jaegertracing/jaeger/model"
-	"github.com/jaegertracing/jaeger/storage/spanstore"
-)
+	"go.opentelemetry.io/collector/exporter/kafkaexporter"
+	"go.opentelemetry.io/collector/receiver/kafkareceiver"
 
-var (
-	_ spanstore.Writer   = (*writer)(nil)
-	_ component.Receiver = (*kafkaReceiver)(nil)
+	ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
+	"github.com/jaegertracing/jaeger/plugin/storage/kafka"
 )
 
-type kafkaReceiver struct {
-	logger   *zap.Logger
-	consumer *ingester.Consumer
+// TypeStr defines receiver type.
+const TypeStr = "kafka"
+
+// Factory wraps kafkareceiver.Factory and makes the default config configurable via viper.
+// For instance this enables using flags as default values in the config object.
+type Factory struct {
+	// Wrapped is Kafka receiver.
+	Wrapped component.ReceiverFactory
+	// Viper is used to get configuration values for default configuration
+	Viper *viper.Viper
 }
 
-type writer struct {
-	receiver     string
-	nextConsumer consumer.TraceConsumer
+var _ component.ReceiverFactory = (*Factory)(nil)
+
+// Type returns the type of the receiver.
+func (f *Factory) Type() configmodels.Type {
+	return f.Wrapped.Type()
 }
 
-func new(
-	config *Config,
-	nextConsumer consumer.TraceConsumer,
+// CreateDefaultConfig returns default configuration of Factory.
+// This function implements OTEL component.ReceiverFactoryBase interface.
+func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
+	cfg := f.Wrapped.CreateDefaultConfig().(*kafkareceiver.Config)
+	// load jaeger config
+	opts := &ingesterApp.Options{}
+	opts.InitFromViper(f.Viper)
+
+	cfg.Brokers = opts.Brokers
+	cfg.ClientID = opts.ClientID
+	cfg.Encoding = MustOtelEncodingForJaegerEncoding(opts.Encoding)
+	cfg.GroupID = opts.GroupID
+	cfg.Topic = opts.Topic
+	cfg.ProtocolVersion = opts.ProtocolVersion
+
+	// kafka consumer groups require a min version of V0_10_2_0.  if no version is specified
+	//  we will assume this
+	if len(cfg.ProtocolVersion) == 0 {
+		cfg.ProtocolVersion = sarama.V0_10_2_0.String()
+	}
+
+	if opts.Authentication == "kerberos" {
+		cfg.Authentication.Kerberos = &kafkaexporter.KerberosConfig{
+			ServiceName: opts.Kerberos.ServiceName,
+			Realm:       opts.Kerberos.Realm,
+			UseKeyTab:   opts.Kerberos.UseKeyTab,
+			Username:    opts.Kerberos.Username,
+			Password:    opts.Kerberos.Password,
+			ConfigPath:  opts.Kerberos.ConfigPath,
+			KeyTabPath:  opts.Kerberos.KeyTabPath,
+		}
+	}
+
+	if opts.Authentication == "plaintext" {
+		cfg.Authentication.PlainText = &kafkaexporter.PlainTextConfig{
+			Username: opts.PlainText.UserName,
+			Password: opts.PlainText.Password,
+		}
+	}
+
+	if opts.Authentication == "tls" && opts.TLS.Enabled {
+		cfg.Authentication.TLS = &configtls.TLSClientSetting{
+			TLSSetting: configtls.TLSSetting{
+				CAFile:   opts.TLS.CAPath,
+				CertFile: opts.TLS.CertPath,
+				KeyFile:  opts.TLS.KeyPath,
+			},
+			ServerName: opts.TLS.ServerName,
+			Insecure:   opts.TLS.SkipHostVerify,
+		}
+	}
+
+	return cfg
+}
+
+// CreateTraceReceiver creates Jaeger receiver trace receiver.
+// This function implements OTEL component.ReceiverFactory interface.
+func (f *Factory) CreateTraceReceiver(
+	ctx context.Context,
 	params component.ReceiverCreateParams,
+	cfg configmodels.Receiver,
+	nextConsumer consumer.TraceConsumer,
 ) (component.TraceReceiver, error) {
-	w := &writer{receiver: config.Name(), nextConsumer: nextConsumer}
-	consumer, err := builder.CreateConsumer(
-		params.Logger,
-		metrics.NullFactory,
-		w,
-		config.Options)
-	if err != nil {
-		return nil, err
-	}
-	return &kafkaReceiver{
-		consumer: consumer,
-		logger:   params.Logger,
-	}, nil
+	return f.Wrapped.CreateTraceReceiver(ctx, params, cfg, nextConsumer)
 }
 
-// Start starts the receiver.
-func (r kafkaReceiver) Start(_ context.Context, _ component.Host) error {
-	r.consumer.Start()
-	return nil
+// CreateMetricsReceiver creates a metrics receiver based on provided config.
+// This function implements component.ReceiverFactory.
+func (f *Factory) CreateMetricsReceiver(
+	ctx context.Context,
+	params component.ReceiverCreateParams,
+	cfg configmodels.Receiver,
+	nextConsumer consumer.MetricsConsumer,
+) (component.MetricsReceiver, error) {
+	return f.Wrapped.CreateMetricsReceiver(ctx, params, cfg, nextConsumer)
 }
 
-// Shutdown shutdowns the receiver.
-func (r kafkaReceiver) Shutdown(_ context.Context) error {
-	return r.consumer.Close()
+// CreateLogsReceiver creates a receiver based on the config.
+// If the receiver type does not support logs or if the config is not valid
+// error will be returned instead.
+func (f Factory) CreateLogsReceiver(
+	ctx context.Context,
+	params component.ReceiverCreateParams,
+	cfg configmodels.Receiver,
+	nextConsumer consumer.LogsConsumer,
+) (component.LogsReceiver, error) {
+	return f.Wrapped.CreateLogsReceiver(ctx, params, cfg, nextConsumer)
 }
 
-// WriteSpan writes a span to the next consumer.
-func (w writer) WriteSpan(ctx context.Context, span *model.Span) error {
-	batch := model.Batch{
-		Spans:   []*model.Span{span},
-		Process: span.Process,
+// MustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding
+func MustOtelEncodingForJaegerEncoding(jaegerEncoding string) string {
+	switch jaegerEncoding {
+	case kafka.EncodingProto:
+		return "jaeger_proto"
+	case kafka.EncodingJSON:
+		return "jaeger_json"
 	}
-	traces := jaegertranslator.ProtoBatchToInternalTraces(batch)
-	return w.nextConsumer.ConsumeTraces(w.addContextMetrics(ctx), traces)
-}
 
-// addContextMetrics decorates the context with labels used in metrics later.
-func (w writer) addContextMetrics(ctx context.Context) context.Context {
-	// TODO too many mallocs here, should be a cheaper way
-	ctx = obsreport.ReceiverContext(ctx, w.receiver, "kafka", "kafka")
-	ctx = obsreport.StartTraceDataReceiveOp(ctx, TypeStr, "kafka")
-	return ctx
+	panic(jaegerEncoding + " is not a supported kafka encoding in the OTEL collector.")
 }
diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go
new file mode 100644
index 000000000000..921161bc1997
--- /dev/null
+++ b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go
@@ -0,0 +1,130 @@
+// Copyright (c) 2020 The Jaeger Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kafkareceiver
+
+import (
+	"path"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	"go.opentelemetry.io/collector/component/componenttest"
+	"go.opentelemetry.io/collector/config/configcheck"
+	"go.opentelemetry.io/collector/config/configtest"
+	otelKafkaReceiver "go.opentelemetry.io/collector/receiver/kafkareceiver"
+
+	"github.com/jaegertracing/jaeger/cmd/flags"
+	jConfig "github.com/jaegertracing/jaeger/pkg/config"
+	"github.com/jaegertracing/jaeger/plugin/storage/kafka"
+)
+
+func TestDefaultConfig(t *testing.T) {
+	v, c := jConfig.Viperize(AddFlags)
+	err := c.ParseFlags([]string{""})
+	require.NoError(t, err)
+
+	factory := &Factory{
+		Wrapped: otelKafkaReceiver.NewFactory(),
+		Viper:   v,
+	}
+	defaultCfg := factory.CreateDefaultConfig().(*otelKafkaReceiver.Config)
+
+	assert.NoError(t, configcheck.ValidateConfig(defaultCfg))
+	assert.Equal(t, "jaeger-spans", defaultCfg.Topic)
+	assert.Equal(t, "jaeger_proto", defaultCfg.Encoding)
+	assert.Equal(t, []string{"127.0.0.1:9092"}, defaultCfg.Brokers)
+	assert.Equal(t, "jaeger-ingester", defaultCfg.ClientID)
+	assert.Equal(t, "jaeger-ingester", defaultCfg.GroupID)
+	assert.Equal(t, "0.10.2.0", defaultCfg.ProtocolVersion)
+	assert.Nil(t, defaultCfg.Authentication.Kerberos)
+	assert.Nil(t, defaultCfg.Authentication.TLS)
+	assert.Nil(t, defaultCfg.Authentication.PlainText)
+}
+
+func TestLoadConfigAndFlags(t *testing.T) {
+	v, c := jConfig.Viperize(AddFlags, flags.AddConfigFileFlag)
+	err := c.ParseFlags([]string{"--config-file=./testdata/jaeger-config.yaml", "--kafka.consumer.topic=jaeger-test", "--kafka.consumer.brokers=host1,host2", "--kafka.consumer.group-id=from-flag", "--kafka.consumer.protocol-version=1.1", "--kafka.consumer.kerberos.realm=from-flag"})
+	require.NoError(t, err)
+
+	err = flags.TryLoadConfigFile(v)
+	require.NoError(t, err)
+
+	factory := &Factory{
+		Wrapped: otelKafkaReceiver.NewFactory(),
+		Viper:   v,
+	}
+
+	factories, err := componenttest.ExampleComponents()
+	factories.Receivers[TypeStr] = factory
+	cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
+	require.NoError(t, err)
+	require.NotNil(t, cfg)
+
+	kafkaCfg := cfg.Receivers[TypeStr].(*otelKafkaReceiver.Config)
+	require.NotNil(t, kafkaCfg)
+
+	assert.Equal(t, TypeStr, kafkaCfg.Name())
+	assert.Equal(t, "jaeger-prod", kafkaCfg.Topic)
+	assert.Equal(t, "emojis", kafkaCfg.Encoding)
+	assert.Equal(t, "1.1", kafkaCfg.ProtocolVersion)
+	assert.Equal(t, []string{"foo", "bar"}, kafkaCfg.Brokers)
+	assert.Equal(t, "user", kafkaCfg.Authentication.PlainText.Username)
+	assert.Equal(t, "123", kafkaCfg.Authentication.PlainText.Password)
+	assert.Equal(t, "ca.crt", kafkaCfg.Authentication.TLS.CAFile)
+	assert.Equal(t, "key.crt", kafkaCfg.Authentication.TLS.KeyFile)
+	assert.Equal(t, true, kafkaCfg.Authentication.TLS.Insecure)
+	assert.Equal(t, "from-flag", kafkaCfg.Authentication.Kerberos.Realm)
+	assert.Equal(t, "/etc/foo", kafkaCfg.Authentication.Kerberos.ConfigPath)
+	assert.Equal(t, "from-jaeger-config", kafkaCfg.Authentication.Kerberos.Username)
+}
+
+func TestMustOtelEncodingForJaegerEncoding(t *testing.T) {
+	tests := []struct {
+		in           string
+		expected     string
+		expectsPanic bool
+	}{
+		{
+			in:       kafka.EncodingProto,
+			expected: "jaeger_proto",
+		},
+		{
+			in:       kafka.EncodingJSON,
+			expected: "jaeger_json",
+		},
+		{
+			in:           "not-an-encoding",
+			expectsPanic: true,
+		},
+	}
+
+	for _, tt := range tests {
+		if tt.expectsPanic {
+			assertPanic(t, func() { MustOtelEncodingForJaegerEncoding(tt.in) })
+			continue
+		}
+
+		assert.Equal(t, tt.expected, MustOtelEncodingForJaegerEncoding(tt.in))
+	}
+}
+
+func assertPanic(t *testing.T, f func()) {
+	defer func() {
+		if r := recover(); r == nil {
+			t.Errorf("The code did not panic")
+		}
+	}()
+	f()
+}
diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/testdata/config.yaml b/cmd/opentelemetry/app/receiver/kafkareceiver/testdata/config.yaml
index 19c8694bde23..983f26419d78 100644
--- a/cmd/opentelemetry/app/receiver/kafkareceiver/testdata/config.yaml
+++ b/cmd/opentelemetry/app/receiver/kafkareceiver/testdata/config.yaml
@@ -1,32 +1,29 @@
-receivers:
-  jaeger_kafka:
-    brokers: foo,bar
-    topic: jaeger-prod
-    encoding: emojis
-    authentication:
-      type: tls
-      plaintext:
-        username: user
-        password: 123
-      tls:
-        enabled: true
-        ca: ca.crt
-        key: key.crt
-        skip_host_verify: true
-      kerberos:
-        realm: jaeger
-        config_file: /etc/foo
+receivers:	
+  kafka:	
+    brokers: foo,bar	
+    topic: jaeger-prod	
+    encoding: emojis	
+    auth:	
+      plain_text:	
+        username: user	
+        password: 123	
+      tls:	
+        ca_file: ca.crt	
+        key_file: key.crt	
+        insecure: true	
+      kerberos:	
+        config_file: /etc/foo	
 
 
-processors:
-  exampleprocessor:
+processors:	
+  exampleprocessor:	
 
-exporters:
-  exampleexporter:
+exporters:	
+  exampleexporter:	
 
-service:
-  pipelines:
-    traces:
-      receivers: [jaeger_kafka]
-      processors: [exampleprocessor]
-      exporters: [exampleexporter]
+service:	
+  pipelines:	
+    traces:	
+      receivers: [kafka]	
+      processors: [exampleprocessor]	
+      exporters: [exampleexporter]
\ No newline at end of file
diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/testdata/jaeger-config.yaml b/cmd/opentelemetry/app/receiver/kafkareceiver/testdata/jaeger-config.yaml
index 6a9fb59fba40..ff831076e9db 100644
--- a/cmd/opentelemetry/app/receiver/kafkareceiver/testdata/jaeger-config.yaml
+++ b/cmd/opentelemetry/app/receiver/kafkareceiver/testdata/jaeger-config.yaml
@@ -1,3 +1,7 @@
 kafka.consumer:
+  authentication: kerberos
+  encoding: json
+  topic: jaeger-prod
   kerberos:
     username: from-jaeger-config
+    realm: jaeger
diff --git a/cmd/opentelemetry/go.sum b/cmd/opentelemetry/go.sum
index 6c870f299079..2bf928958dd9 100644
--- a/cmd/opentelemetry/go.sum
+++ b/cmd/opentelemetry/go.sum
@@ -1153,6 +1153,8 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
 go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
 go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto=
 go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
+go.opentelemetry.io v0.1.0 h1:EANZoRCOP+A3faIlw/iN6YEWoYb1vleZRKm1EvH8T48=
+go.opentelemetry.io/collector v0.10.0 h1:4T3oARuePrFo8PR6ZYMploL8EcucoaB7tV2hRYg2L+k=
 go.opentelemetry.io/collector v0.10.1-0.20200917170114-639b9a80ed46 h1:QfvrAwDwB6zp4KuCz6pQxjJaiMbEWWl1Cm/x9Zfnwpk=
 go.opentelemetry.io/collector v0.10.1-0.20200917170114-639b9a80ed46/go.mod h1:PFIDJqfiYoOFyOZTNczWwxzzV0D3rmITGXtG0UAt/ug=
 go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
diff --git a/examples/hotrod/services/frontend/gen_assets.go b/examples/hotrod/services/frontend/gen_assets.go
index 5b2c20563df1..f2899b5219bd 100644
--- a/examples/hotrod/services/frontend/gen_assets.go
+++ b/examples/hotrod/services/frontend/gen_assets.go
@@ -213,7 +213,7 @@ var _escData = map[string]*_escFile{
 		name:    "index.html",
 		local:   "examples/hotrod/services/frontend/web_assets/index.html",
 		size:    3530,
-		modtime: 1597078961,
+		modtime: 1597936165,
 		compressed: `
 H4sIAAAAAAAC/9RXX1PbSBJ/96fom82d5DOSMIZAjOUtDmcJ2cuSM5Ct3FYeRqO2NEaaUWZGtlmK7341
 +mNkSKru3i5+gJnunu7+9V97kpo8m/YAJjkaCiylSqMJyeX1lXdycvTGG5InrqA5hmTFcV1IZQgwKQwK
@@ -247,7 +247,7 @@ qcf+ae+xLqTO1+RJUP+E+08AAAD//1QoAETKDQAA
 		name:    "jquery-3.1.1.min.js",
 		local:   "examples/hotrod/services/frontend/web_assets/jquery-3.1.1.min.js",
 		size:    86709,
-		modtime: 1597079278,
+		modtime: 1597936165,
 		compressed: `
 H4sIAAAAAAAC/8y9fZebONIo/v/9FG02D4PaMm1nZvbewa1wMslkN7vztpPMzO5iskeAwLgxuAGnO2PY
 z/47KkkgME5mn+fec34nJ21A71KpVFWql5vr2dXub0dWfrh6/7m9sldXzZUVIvXtVXHMI1qnRX7VXO3u
diff --git a/plugin/storage/es/mappings/gen_assets.go b/plugin/storage/es/mappings/gen_assets.go
index 477ebb11f658..77195053de8e 100644
--- a/plugin/storage/es/mappings/gen_assets.go
+++ b/plugin/storage/es/mappings/gen_assets.go
@@ -213,7 +213,7 @@ var _escData = map[string]*_escFile{
 		name:    ".nocover",
 		local:   "plugin/storage/es/mappings/.nocover",
 		size:    43,
-		modtime: 1593482248,
+		modtime: 1597437395,
 		compressed: `
 H4sIAAAAAAAC/youSSzJzFYoSEzOTkxPVcjILy4pVkgsLcnXTU/NSy1KLElNUUjLzEkt1uMCBAAA//8y
 IKK1KwAAAA==
@@ -224,7 +224,7 @@ IKK1KwAAAA==
 		name:    "jaeger-dependencies-7.json",
 		local:   "plugin/storage/es/mappings/jaeger-dependencies-7.json",
 		size:    283,
-		modtime: 1593482248,
+		modtime: 1597437395,
 		compressed: `
 H4sIAAAAAAAC/2zPz0vDQBDF8Xv+imXxVNrFi5fcqlYU/EWK52GbfU1HknHdmYBQ8r9LRA/S3t/nC+9Y
 OedZEr4oRzMUUV87v3iP6FBWCRmSIC1DVwu/nNcKM5ZOfT3jPx5kHHYo9LEnPcSS5szFkej57el609DL
@@ -237,7 +237,7 @@ rYyonPvp+t/efGyqpuo7AAD//66cHf8bAQAA
 		name:    "jaeger-dependencies.json",
 		local:   "plugin/storage/es/mappings/jaeger-dependencies.json",
 		size:    277,
-		modtime: 1593482248,
+		modtime: 1597437395,
 		compressed: `
 H4sIAAAAAAAC/2zPzUoDMRTF8f08Rbi4Km1w4ya7qhUFv5ji+pJOTqeRTIy5d0Ao8+4y4kbG/fn94Zwb
 Y0gxlOQV5Ayt3j161E1AQQ7IXYRsVrSedwLVmHshNzNjKOaAL5vH4YDKH0eWk69ByJmLM/Pz29P1ruWX
@@ -250,7 +250,7 @@ tI5ojPnp0m9vPjY1U/MdAAD//5ZQx/QVAQAA
 		name:    "jaeger-service-7.json",
 		local:   "plugin/storage/es/mappings/jaeger-service-7.json",
 		size:    878,
-		modtime: 1593482248,
+		modtime: 1597437395,
 		compressed: `
 H4sIAAAAAAAC/8ySwW7aQBCG736K1agnBFZViR72RluqVmppBcopikaDPdibeNeb3YEEIb97ZGTAhHDL
 IReP5H++35/t3SVKgXE5P6MnEQ4uglYwuCcuOIwih43JeDSAYbsYWcS4IoJuuQOZurVdcsB6hbGkkLcN
@@ -265,7 +265,7 @@ FMjWM2h44O1THXIYnqemcHVgpGW9YdBfxl97cdPfBU9SoiXJStAgVKQDOMZN8oroOftQZxzjh9DuXNJr
 		name:    "jaeger-service.json",
 		local:   "plugin/storage/es/mappings/jaeger-service.json",
 		size:    1060,
-		modtime: 1593482248,
+		modtime: 1597437395,
 		compressed: `
 H4sIAAAAAAAC/8yTT2/UMBDF7/kU1ojTamshpHLwrUARSFDQVpwQGs3Gs1mD7RjbKayqfHfk4pKkW+2J
 Q3PIn/F78/xz7NtGCMjsgqXMoASsvhN3HM8SxxvT8tkK1kWSOGfjuwSqOIQA4zX/ln5wW47Y7zDtKeoE
@@ -281,7 +281,7 @@ TmGdQJrhQAmkbHr//7o382e5j83Y/AkAAP//qd2MzCQEAAA=
 		name:    "jaeger-span-7.json",
 		local:   "plugin/storage/es/mappings/jaeger-span-7.json",
 		size:    3420,
-		modtime: 1593482248,
+		modtime: 1597437395,
 		compressed: `
 H4sIAAAAAAAC/+xWXW+UQBR951eQG5+aLTEm9YG3amtsYqtp65Mxk7twYaedL2fuVjcN/93A0hYKbE2k
 xhhflix3zuHcmXsO3EZxDNLk9EM4ZCZvAqQx7F0hleT3g0OzvweLuF4WiFmaMkBao+5wiVnrJXlhCxFW
@@ -300,7 +300,7 @@ s/IPX+q/S7/jOB55dNyfveXPdm4jPpxtT0d9N2fQzT1xE5+s9W8VVdHPAAAA//+SuQbQXA0AAA==
 		name:    "jaeger-span.json",
 		local:   "plugin/storage/es/mappings/jaeger-span.json",
 		size:    3830,
-		modtime: 1593482248,
+		modtime: 1597437395,
 		compressed: `
 H4sIAAAAAAAC/+xW0W/TPhB+z18RnX5PUxf9hDQe8jbYEJPYQNt4Qsi6JpfUm2Mb+zqopv7vKE1Lm9ZJ
 QGoQEvShbWx/391n333xcxTHwFRZhUyQxnDygFSSO/UW9ekJTOp5T8xSlx7Senkcg9Q5fUv0vJqSE6YQ
diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go
index 4f5cca2e0a35..57af48c63435 100644
--- a/plugin/storage/kafka/options.go
+++ b/plugin/storage/kafka/options.go
@@ -117,23 +117,6 @@ type Options struct {
 
 // AddFlags adds flags for Options
 func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
-	flagSet.String(
-		configPrefix+suffixBrokers,
-		defaultBroker,
-		"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
-	flagSet.String(
-		configPrefix+suffixTopic,
-		defaultTopic,
-		"The name of the kafka topic")
-	flagSet.String(
-		configPrefix+suffixProtocolVersion,
-		"",
-		"Kafka protocol version - must be supported by kafka server")
-	flagSet.String(
-		configPrefix+suffixEncoding,
-		defaultEncoding,
-		fmt.Sprintf(`Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
-	)
 	flagSet.String(
 		configPrefix+suffixRequiredAcks,
 		defaultRequiredAcks,
@@ -169,6 +152,28 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
 		defaultBatchMaxMessages,
 		"(experimental) Maximum number of message to batch before sending records to Kafka",
 	)
+	opt.AddOTELFlags(flagSet)
+}
+
+// AddOTELFlags adds flags supported by the OTEL Collector
+func (opt *Options) AddOTELFlags(flagSet *flag.FlagSet) {
+	flagSet.String(
+		configPrefix+suffixBrokers,
+		defaultBroker,
+		"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
+	flagSet.String(
+		configPrefix+suffixTopic,
+		defaultTopic,
+		"The name of the kafka topic")
+	flagSet.String(
+		configPrefix+suffixProtocolVersion,
+		"",
+		"Kafka protocol version - must be supported by kafka server")
+	flagSet.String(
+		configPrefix+suffixEncoding,
+		defaultEncoding,
+		fmt.Sprintf(`Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
+	)
 	auth.AddFlags(configPrefix, flagSet)
 }