Skip to content

Commit

Permalink
Enable queueing, retry, timeout for Kafka exporter
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jul 29, 2020
1 parent 1d31f0f commit 90d64b1
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 10 deletions.
21 changes: 18 additions & 3 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,25 @@ The following settings are required:
The following settings can be optionally configured:
- `brokers` (default = localhost:9092): The list of kafka brokers
- `topic` (default = otlp_spans): The name of the kafka topic to export to
- `metadata.full` (default = true): Whether to maintain a full set of metadata.
- `metadata`
- `full` (default = true): Whether to maintain a full set of metadata.
When disabled the client does not make the initial request to broker at the startup.
- `metadata.retry.max` (default = 3): The number of retries to get metadata
- `metadata.retry.backoff` (default = 250ms): How long to wait between metadata retries
- `retry`
- `max` (default = 3): The number of retries to get metadata
- `backoff` (default = 250ms): How long to wait between metadata retries
- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend.
- `retry_on_failure`
- `enabled` (default = true)
- `initial_interval` (default = 5s): Time to wait after the first failure before retrying; ignored if `enabled` is `false`
- `max_interval` (default = 30s): Is the upper bound on backoff; ignored if `enabled` is `false`
- `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if `enabled` is `false`
- `sending_queue`
- `enabled` (default = false)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `queue_size` (default = 5000): Maximum number of batches kept in memory before dropping data; ignored if `enabled` is `false`;
User should calculate this as `num_seconds * requests_per_second` where:
- `num_seconds` is the number of seconds to buffer in case of a backend outage
- `requests_per_second` is the average number of requests per seconds.

Example configuration:

Expand Down
7 changes: 6 additions & 1 deletion exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ import (
"time"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Config defines configuration for Kafka exporter.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"`
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

// The list of kafka brokers (default localhost:9092)
Brokers []string `mapstructure:"brokers"`
// Kafka protocol version
Expand Down
16 changes: 16 additions & 0 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package kafkaexporter
import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -42,6 +44,20 @@ func TestLoadConfig(t *testing.T) {
NameVal: typeStr,
TypeVal: typeStr,
},
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Brokers: []string{"foo:123", "bar:456"},
Metadata: Metadata{
Expand Down
19 changes: 15 additions & 4 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,19 @@ func NewFactory() component.ExporterFactory {
}

func createDefaultConfig() configmodels.Exporter {
// TODO: Enable the queued settings by default.
qs := exporterhelper.CreateDefaultQueueSettings()
qs.Enabled = false
return &Config{
ExporterSettings: configmodels.ExporterSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
Brokers: []string{defaultBroker},
Topic: defaultTopic,
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: qs,
Brokers: []string{defaultBroker},
Topic: defaultTopic,
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand All @@ -66,13 +72,18 @@ func createTraceExporter(
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.TraceExporter, error) {
c := cfg.(*Config)
exp, err := newExporter(*c, params)
oCfg := cfg.(*Config)
exp, err := newExporter(*oCfg, params)
if err != nil {
return nil, err
}
return exporterhelper.NewTraceExporter(
cfg,
exp.traceDataPusher,
// Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
// and will rely on the sarama Producer Timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithShutdown(exp.Close))
}
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func newExporter(config Config, params component.ExporterCreateParams) (*kafkaPr
c.Producer.Return.Errors = true
// Wait only the local commit to succeed before responding.
c.Producer.RequiredAcks = sarama.WaitForLocal
// Because sarama does not accept a Context for every message, set the Timeout here.
c.Producer.Timeout = config.Timeout
c.Metadata.Full = config.Metadata.Full
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
Expand Down
9 changes: 7 additions & 2 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func TestTraceDataPusher(t *testing.T) {
producer: producer,
marshaller: &protoMarshaller{},
}
defer p.Close(context.Background())
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
droppedSpans, err := p.traceDataPusher(context.Background(), testdata.GenerateTraceDataTwoSpansSameResource())
require.NoError(t, err)
assert.Equal(t, 0, droppedSpans)
Expand All @@ -63,7 +65,9 @@ func TestTraceDataPusher_err(t *testing.T) {
marshaller: &protoMarshaller{},
logger: zap.NewNop(),
}
defer p.Close(context.Background())
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
td := testdata.GenerateTraceDataTwoSpansSameResource()
droppedSpans, err := p.traceDataPusher(context.Background(), td)
assert.EqualError(t, err, expErr.Error())
Expand All @@ -78,6 +82,7 @@ func TestTraceDataPusher_marshall_error(t *testing.T) {
}
td := testdata.GenerateTraceDataTwoSpansSameResource()
droppedSpans, err := p.traceDataPusher(context.Background(), td)
require.Error(t, err)
assert.Contains(t, err.Error(), expErr.Error())
assert.Equal(t, td.SpanCount(), droppedSpans)
}
Expand Down
10 changes: 10 additions & 0 deletions exporter/kafkaexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ exporters:
full: false
retry:
max: 15
timeout: 10s
sending_queue:
enabled: true
num_consumers: 2
queue_size: 10
retry_on_failure:
enabled: true
initial_interval: 10s
max_interval: 60s
max_elapsed_time: 10m

processors:
exampleprocessor:
Expand Down

0 comments on commit 90d64b1

Please sign in to comment.