-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kafka exporter #1439
Add Kafka exporter #1439
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1439 +/- ##
==========================================
+ Coverage 90.83% 90.88% +0.05%
==========================================
Files 234 237 +3
Lines 16472 16541 +69
==========================================
+ Hits 14962 15033 +71
+ Misses 1082 1080 -2
Partials 428 428
Continue to review full report at Codecov.
|
type Config struct { | ||
configmodels.ExporterSettings `mapstructure:",squash"` | ||
// The list of kafka brokers (default localhost:9092) | ||
Brokers []string `mapstructure:"brokers"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this required to be a list? what is the behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka client accepts a list of brokers not only a single address. In statically configured environments this can be handy.
It is the initial list of broker addresses that is used to discover all nodes https://docs.confluent.io/current/installation/configuration/consumer-configs.html#bootstrap.servers
} | ||
c.Version = version | ||
} | ||
producer, err := sarama.NewAsyncProducer(config.Brokers, c) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why async producer vs sync? I would suggest to use sync producer and rely on the queued/retry logic from the exporterhelper if possible.
It is good to have a consistent logic of sending queues and retries across exporters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree on the consistent logic across exporter. We did the same approach in the ES exporter. However this might be suboptimal for a couple of reasons:
- multiple serializations if retry happens
- driver can immediately retry some failed requests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have switched to sync producer for now. We can switch to async if we prove that it performs better it is just an impl detail.
Some high-level concerns/questions:
request := &otlptrace.ExportTraceServiceRequest{
ResourceSpans: pdata.TracesToOtlp(td),
} |
437dc0a
to
d5c4ece
Compare
|
Signed-off-by: Pavol Loffay <[email protected]>
Signed-off-by: Pavol Loffay <[email protected]>
Signed-off-by: Pavol Loffay <[email protected]>
Signed-off-by: Pavol Loffay <[email protected]>
Signed-off-by: Pavol Loffay <[email protected]>
733ab4b
to
db1fc33
Compare
Signed-off-by: Pavol Loffay <[email protected]>
Signed-off-by: Pavol Loffay <[email protected]>
Signed-off-by: Pavol Loffay <[email protected]>
@bogdandrutu I have updated the PR it's ready for the second pass. |
exporter/kafkaexporter/config.go
Outdated
// Retry defines retry configuration for Metadata. | ||
type Retry struct { | ||
// The total number of times to retry a metadata request when the | ||
// cluster is in the middle of a leader election or at startup (default 3). | ||
Max int `mapstructure:"max"` | ||
// How long to wait for leader election to occur before retrying | ||
// (default 250ms). Similar to the JVM's `retry.backoff.ms`. | ||
BackOff time.Duration `mapstructure:"backoff"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use RetrySettings from exporter helper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This configuration is not for retrying failed batches to the broker.
This struct configures retries for getting the metadata from the broker. This happens at the startup or when the broker is in the middle of the leader election. This setting is useful when there is race condition at broker and producer startup e.g .in docker compose deployment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also exporterhelper.RetrySettings
does not define maximum number of retries [int] and it also exposes other settings that does not apply to Kafka metadata retry.
c.Metadata.Full = config.Metadata.Full | ||
c.Metadata.Retry.Max = config.Metadata.Retry.Max | ||
c.Metadata.Retry.Backoff = config.Metadata.Retry.BackOff |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with other protocols, I would suggest to use this https://github.com/open-telemetry/opentelemetry-collector/blob/master/exporter/exporterhelper/queued_retry.go logic to control queuing and retry and disable native retries from the sarama client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment above #1439 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think overall is very close to be done.
Kafka exporter exports traces to Kafka. | ||
|
||
The following settings are required: | ||
- `protocol_version` (no default): Kafka protocol version e.g. 2.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please mention in the readme what is the proto that we write to the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There will be a follow-up PR to make that configurable.
Since we already import Jaeger and Zipkin we should make the encoding configurable. This will allow folks easy migration.
@pavolloffay there are few other comments not addressed. Please fix that and we are good to go. |
Signed-off-by: Pavol Loffay <[email protected]>
Signed-off-by: Pavol Loffay <[email protected]>
Signed-off-by: Pavol Loffay <[email protected]>
@bogdandrutu I have addressed the comments and added more docs especially explaining that metadata retry is not the retry for failed messages. |
@pavolloffay will merge and I will do some cleanups. |
…y#1439) Bumps [boto3](https://github.com/boto/boto3) from 1.21.33 to 1.21.34. - [Release notes](https://github.com/boto/boto3/releases) - [Changelog](https://github.com/boto/boto3/blob/develop/CHANGELOG.rst) - [Commits](boto/boto3@1.21.33...1.21.34) --- updated-dependencies: - dependency-name: boto3 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Pavol Loffay [email protected]
Description:
Notable changes
otlptrace.ResourceSpans
)Link to tracking Issue:
Created from #1410 (comment)
Resolves open-telemetry/opentelemetry-collector-contrib#268
Related to #1331
Testing: < Describe what testing was performed and which tests were added.>
Tested locally with Confluent 5.0.0 https://docs.confluent.io/5.0.0/installation/docker/docs/installation/single-node-client.html and #1410
Documentation: < Describe the documentation added.>
Added Kafka exporter readme