Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): Add support for SASL auth to Kafka #14487

Merged
merged 4 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix tests
  • Loading branch information
benclive committed Oct 15, 2024
commit 83bc405c8512ba0908b4d30b14a81d484ea34ae9
2 changes: 1 addition & 1 deletion pkg/kafka/client/reader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func setDefaultNumberOfPartitionsForAutocreatedTopics(cfg kafka.Config, cl *kgo.
return
}

// Note: this client doesn't get closed because it is owned by the caller
adm := kadm.NewClient(cl)
defer adm.Close()

defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions)
_, err := adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{
Expand Down
6 changes: 3 additions & 3 deletions pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (m *mockConsumer) Flush(ctx context.Context) error {
}

func TestPartitionReader_BasicFunctionality(t *testing.T) {
_, kafkaCfg := testkafka.CreateCluster(t, 1, "test-topic")
_, kafkaCfg := testkafka.CreateCluster(t, 1, "test")
consumer := newMockConsumer()

consumerFactory := func(_ Committer) (Consumer, error) {
Expand All @@ -83,8 +83,8 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) {
require.NoError(t, err)
require.Len(t, records, 1)

producer.ProduceSync(context.Background(), records...)
producer.ProduceSync(context.Background(), records...)
require.NoError(t, producer.ProduceSync(context.Background(), records...).FirstErr())
require.NoError(t, producer.ProduceSync(context.Background(), records...).FirstErr())

// Wait for records to be processed
assert.Eventually(t, func() bool {
Expand Down
Loading