Skip to content

Commit

Permalink
Expose UnknownTopicRetries
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop committed Nov 6, 2024
1 parent 13f9d90 commit d78b865
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
3 changes: 3 additions & 0 deletions kafka/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,9 @@ func setupTestProducer(t testing.TB, tafunc TopicAttributeFunc) (*Producer, sdkm
TopicAttributeFunc: tafunc,
},
Sync: true,
// NOTE(marclop) avoids lengthy retries that could cause the tests
// to timeout.
UnknownTopicRetries: 0,
})
return producer, rdr
}
Expand Down
22 changes: 21 additions & 1 deletion kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ type ProducerConfig struct {
// RecordPartitioner is a function that returns the partition to which
// a record should be sent. If nil, the default partitioner is used.
RecordPartitioner kgo.Partitioner

// UnknownTopicRetries specifies the number of times to retry producing
// to an unknown topic before giving up. If set to `-1`, it will retry
// indefinitely. If set to `-2`, it will never retry. If set to `0`, it
// will use the default 4 retries.
// NOTE(marclop) This behavior is different than the kgo library, which
// uses 0 as the never retry and if not set, it uses a default of 4.
UnknownTopicRetries int
}

// BatchWriteListener specifies a callback function that is invoked after a batch is
Expand Down Expand Up @@ -162,7 +170,19 @@ func NewProducer(cfg ProducerConfig) (*Producer, error) {
if err := cfg.finalize(); err != nil {
return nil, fmt.Errorf("kafka: invalid producer config: %w", err)
}
var opts []kgo.Opt

var retries int
switch cfg.UnknownTopicRetries {
case 0: // Default non-breakng value.
retries = 4
case -2: // Never retry
retries = 0
default:
retries = cfg.UnknownTopicRetries
}
opts := []kgo.Opt{
kgo.UnknownTopicRetries(retries),
}
if len(cfg.CompressionCodec) > 0 {
opts = append(opts, kgo.ProducerBatchCompression(cfg.CompressionCodec...))
}
Expand Down

0 comments on commit d78b865

Please sign in to comment.