diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md
index 132de42b81075..a161232ab9305 100644
--- a/docs/sources/shared/configuration.md
+++ b/docs/sources/shared/configuration.md
@@ -793,6 +793,16 @@ kafka_config:
   # CLI flag: -kafka.write-timeout
   [write_timeout: <duration> | default = 10s]
 
+  # The SASL username for authentication to Kafka using the PLAIN mechanism.
+  # Both username and password must be set.
+  # CLI flag: -kafka.sasl-username
+  [sasl_username: <string> | default = ""]
+
+  # The SASL password for authentication to Kafka using the PLAIN mechanism.
+  # Both username and password must be set.
+  # CLI flag: -kafka.sasl-password
+  [sasl_password: <string> | default = ""]
+
   # The consumer group used by the consumer to track the last consumed offset.
   # The consumer group must be different for each ingester. If the configured
   # consumer group contains the '<partition>' placeholder, it is replaced with
diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go
index 30383bfcbbbd4..6f69f0e02a84e 100644
--- a/pkg/distributor/distributor.go
+++ b/pkg/distributor/distributor.go
@@ -47,6 +47,7 @@ import (
 	"github.com/grafana/loki/v3/pkg/ingester"
 	"github.com/grafana/loki/v3/pkg/ingester/client"
 	"github.com/grafana/loki/v3/pkg/kafka"
+	kafka_client "github.com/grafana/loki/v3/pkg/kafka/client"
 	"github.com/grafana/loki/v3/pkg/loghttp/push"
 	"github.com/grafana/loki/v3/pkg/logproto"
 	"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
@@ -234,11 +235,11 @@ func New(
 
 	var kafkaWriter KafkaProducer
 	if cfg.KafkaEnabled {
-		kafkaClient, err := kafka.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer)
+		kafkaClient, err := kafka_client.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer)
 		if err != nil {
 			return nil, fmt.Errorf("failed to start kafka client: %w", err)
 		}
-		kafkaWriter = kafka.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes,
+		kafkaWriter = kafka_client.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes,
 			prometheus.WrapRegistererWithPrefix("_kafka_", registerer))
 	}
 
diff --git a/pkg/kafka/logger.go b/pkg/kafka/client/logger.go
similarity index 98%
rename from pkg/kafka/logger.go
rename to pkg/kafka/client/logger.go
index e055094a4163b..3be96839e1205 100644
--- a/pkg/kafka/logger.go
+++ b/pkg/kafka/client/logger.go
@@ -1,6 +1,6 @@
 // SPDX-License-Identifier: AGPL-3.0-only
 
-package kafka
+package client
 
 import (
 	"github.com/go-kit/log"
diff --git a/pkg/kafka/reader_client.go b/pkg/kafka/client/reader_client.go
similarity index 51%
rename from pkg/kafka/reader_client.go
rename to pkg/kafka/client/reader_client.go
index 9237686fee609..e8bbb2da8c86a 100644
--- a/pkg/kafka/reader_client.go
+++ b/pkg/kafka/client/reader_client.go
@@ -1,19 +1,25 @@
 // SPDX-License-Identifier: AGPL-3.0-only
 
-package kafka
+package client
 
 import (
+	"context"
+	"fmt"
 	"time"
 
 	"github.com/go-kit/log"
+	"github.com/go-kit/log/level"
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
+	"github.com/twmb/franz-go/pkg/kadm"
 	"github.com/twmb/franz-go/pkg/kgo"
 	"github.com/twmb/franz-go/plugin/kprom"
+
+	"github.com/grafana/loki/v3/pkg/kafka"
 )
 
 // NewReaderClient returns the kgo.Client that should be used by the Reader.
-func NewReaderClient(kafkaCfg Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) {
+func NewReaderClient(kafkaCfg kafka.Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) {
 	const fetchMaxBytes = 100_000_000
 
 	opts = append(opts, commonKafkaClientOptions(kafkaCfg, metrics, logger)...)
@@ -33,7 +39,7 @@ func NewReaderClient(kafkaCfg Config, metrics *kprom.Metrics, logger log.Logger,
 		return nil, errors.Wrap(err, "creating kafka client")
 	}
 	if kafkaCfg.AutoCreateTopicEnabled {
-		kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger)
+		setDefaultNumberOfPartitionsForAutocreatedTopics(kafkaCfg, client, logger)
 	}
 	return client, nil
 }
@@ -44,3 +50,29 @@ func NewReaderClientMetrics(component string, reg prometheus.Registerer) *kprom.
 		// Do not export the client ID, because we use it to specify options to the backend.
 		kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes))
 }
+
+// setDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers.
+// This is best-effort, if setting the option fails, error is logged, but not returned.
+func setDefaultNumberOfPartitionsForAutocreatedTopics(cfg kafka.Config, cl *kgo.Client, logger log.Logger) {
+	if cfg.AutoCreateTopicDefaultPartitions <= 0 {
+		return
+	}
+
+	// Note: this client doesn't get closed because it is owned by the caller
+	adm := kadm.NewClient(cl)
+
+	defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions)
+	_, err := adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{
+		{
+			Op:    kadm.SetConfig,
+			Name:  "num.partitions",
+			Value: &defaultNumberOfPartitions,
+		},
+	})
+	if err != nil {
+		level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err)
+		return
+	}
+
+	level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions)
+}
diff --git a/pkg/kafka/client/reader_client_test.go b/pkg/kafka/client/reader_client_test.go
new file mode 100644
index 0000000000000..90980ad0e9128
--- /dev/null
+++ b/pkg/kafka/client/reader_client_test.go
@@ -0,0 +1,104 @@
+package client
+
+import (
+	"context"
+	"testing"
+
+	"github.com/go-kit/log"
+	"github.com/grafana/dskit/flagext"
+	"github.com/stretchr/testify/require"
+	"github.com/twmb/franz-go/pkg/kfake"
+	"github.com/twmb/franz-go/pkg/kgo"
+	"github.com/twmb/franz-go/pkg/kmsg"
+
+	"github.com/grafana/loki/v3/pkg/kafka"
+	"github.com/grafana/loki/v3/pkg/kafka/testkafka"
+)
+
+func TestNewReaderClient(t *testing.T) {
+	_, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test", kfake.EnableSASL(), kfake.Superuser("PLAIN", "user", "password"))
+
+	tests := []struct {
+		name    string
+		config  kafka.Config
+		wantErr bool
+	}{
+		{
+			name: "valid config",
+			config: kafka.Config{
+				Address:      addr,
+				Topic:        "abcd",
+				SASLUsername: "user",
+				SASLPassword: flagext.SecretWithValue("password"),
+			},
+			wantErr: false,
+		},
+		{
+			name: "wrong password",
+			config: kafka.Config{
+				Address:      addr,
+				Topic:        "abcd",
+				SASLUsername: "user",
+				SASLPassword: flagext.SecretWithValue("wrong wrong wrong"),
+			},
+			wantErr: true,
+		},
+		{
+			name: "wrong username",
+			config: kafka.Config{
+				Address:      addr,
+				Topic:        "abcd",
+				SASLUsername: "wrong wrong wrong",
+				SASLPassword: flagext.SecretWithValue("password"),
+			},
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			client, err := NewReaderClient(tt.config, nil, nil)
+			require.NoError(t, err)
+
+			err = client.Ping(context.Background())
+			if tt.wantErr {
+				require.Error(t, err)
+			} else {
+				require.NoError(t, err)
+			}
+		})
+	}
+}
+
+func TestSetDefaultNumberOfPartitionsForAutocreatedTopics(t *testing.T) {
+	cluster, err := kfake.NewCluster(kfake.NumBrokers(1))
+	require.NoError(t, err)
+	t.Cleanup(cluster.Close)
+
+	addrs := cluster.ListenAddrs()
+	require.Len(t, addrs, 1)
+
+	cfg := kafka.Config{
+		Address:                          addrs[0],
+		AutoCreateTopicDefaultPartitions: 100,
+	}
+
+	cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) {
+		r := request.(*kmsg.AlterConfigsRequest)
+
+		require.Len(t, r.Resources, 1)
+		res := r.Resources[0]
+		require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType)
+		require.Len(t, res.Configs, 1)
+		cfg := res.Configs[0]
+		require.Equal(t, "num.partitions", cfg.Name)
+		require.NotNil(t, *cfg.Value)
+		require.Equal(t, "100", *cfg.Value)
+
+		return &kmsg.AlterConfigsResponse{}, nil, true
+	})
+
+	client, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, log.NewNopLogger())...)
+	require.NoError(t, err)
+
+	setDefaultNumberOfPartitionsForAutocreatedTopics(cfg, client, log.NewNopLogger())
+}
diff --git a/pkg/kafka/writer_client.go b/pkg/kafka/client/writer_client.go
similarity index 90%
rename from pkg/kafka/writer_client.go
rename to pkg/kafka/client/writer_client.go
index 59fefda31d19b..1493e17f51686 100644
--- a/pkg/kafka/writer_client.go
+++ b/pkg/kafka/client/writer_client.go
@@ -1,4 +1,4 @@
-package kafka
+package client
 
 import (
 	"context"
@@ -13,20 +13,30 @@ import (
 	"github.com/twmb/franz-go/pkg/kerr"
 	"github.com/twmb/franz-go/pkg/kgo"
 	"github.com/twmb/franz-go/pkg/kmsg"
+	"github.com/twmb/franz-go/pkg/sasl/plain"
 	"github.com/twmb/franz-go/plugin/kotel"
 	"github.com/twmb/franz-go/plugin/kprom"
 	"go.opentelemetry.io/otel/propagation"
 	"go.opentelemetry.io/otel/trace"
 	"go.uber.org/atomic"
 
+	"github.com/grafana/loki/v3/pkg/kafka"
 	"github.com/grafana/loki/v3/pkg/util/constants"
 )
 
+var (
+	// writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout.
+	// You can think about this overhead as an extra time for requests sitting in the client's buffer
+	// before being sent on the wire and the actual time it takes to send it over the network and
+	// start being processed by Kafka.
+	writerRequestTimeoutOverhead = 2 * time.Second
+)
+
 // NewWriterClient returns the kgo.Client that should be used by the Writer.
 //
 // The input prometheus.Registerer must be wrapped with a prefix (the names of metrics
 // registered don't have a prefix).
-func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) {
+func NewWriterClient(kafkaCfg kafka.Config, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) {
 	// Do not export the client ID, because we use it to specify options to the backend.
 	metrics := kprom.NewMetrics(
 		"", // No prefix. We expect the input prometheus.Registered to be wrapped with a prefix.
@@ -42,7 +52,7 @@ func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log
 		kgo.RecordPartitioner(kgo.ManualPartitioner()),
 
 		// Set the upper bounds the size of a record batch.
-		kgo.ProducerBatchMaxBytes(producerBatchMaxBytes),
+		kgo.ProducerBatchMaxBytes(kafka.ProducerBatchMaxBytes),
 
 		// By default, the Kafka client allows 1 Produce in-flight request per broker. Disabling write idempotency
 		// (which we don't need), we can increase the max number of in-flight Produce requests per broker. A higher
@@ -81,10 +91,14 @@ func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log
 		kgo.MaxBufferedRecords(math.MaxInt), // Use a high value to set it as unlimited, because the client doesn't support "0 as unlimited".
 		kgo.MaxBufferedBytes(0),
 	)
+	client, err := kgo.NewClient(opts...)
+	if err != nil {
+		return nil, err
+	}
 	if kafkaCfg.AutoCreateTopicEnabled {
-		kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger)
+		setDefaultNumberOfPartitionsForAutocreatedTopics(kafkaCfg, client, logger)
 	}
-	return kgo.NewClient(opts...)
+	return client, nil
 }
 
 type onlySampledTraces struct {
@@ -99,7 +113,7 @@ func (o onlySampledTraces) Inject(ctx context.Context, carrier propagation.TextM
 	o.TextMapPropagator.Inject(ctx, carrier)
 }
 
-func commonKafkaClientOptions(cfg Config, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt {
+func commonKafkaClientOptions(cfg kafka.Config, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt {
 	opts := []kgo.Opt{
 		kgo.ClientID(cfg.ClientID),
 		kgo.SeedBrokers(cfg.Address),
@@ -139,6 +153,16 @@ func commonKafkaClientOptions(cfg Config, metrics *kprom.Metrics, logger log.Log
 		}),
 	}
 
+	// SASL plain auth.
+	if cfg.SASLUsername != "" && cfg.SASLPassword.String() != "" {
+		opts = append(opts, kgo.SASL(plain.Plain(func(_ context.Context) (plain.Auth, error) {
+			return plain.Auth{
+				User: cfg.SASLUsername,
+				Pass: cfg.SASLPassword.String(),
+			}, nil
+		})))
+	}
+
 	if cfg.AutoCreateTopicEnabled {
 		opts = append(opts, kgo.AllowAutoTopicCreation())
 	}
diff --git a/pkg/kafka/client/writer_client_test.go b/pkg/kafka/client/writer_client_test.go
new file mode 100644
index 0000000000000..4feb782ffe639
--- /dev/null
+++ b/pkg/kafka/client/writer_client_test.go
@@ -0,0 +1,71 @@
+package client
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/grafana/dskit/flagext"
+	"github.com/stretchr/testify/require"
+	"github.com/twmb/franz-go/pkg/kfake"
+
+	"github.com/grafana/loki/v3/pkg/kafka"
+	"github.com/grafana/loki/v3/pkg/kafka/testkafka"
+)
+
+func TestNewWriterClient(t *testing.T) {
+	_, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test", kfake.EnableSASL(), kfake.Superuser("PLAIN", "user", "password"))
+
+	tests := []struct {
+		name    string
+		config  kafka.Config
+		wantErr bool
+	}{
+		{
+			name: "valid config",
+			config: kafka.Config{
+				Address:      addr,
+				Topic:        "abcd",
+				WriteTimeout: time.Second,
+				SASLUsername: "user",
+				SASLPassword: flagext.SecretWithValue("password"),
+			},
+			wantErr: false,
+		},
+		{
+			name: "wrong password",
+			config: kafka.Config{
+				Address:      addr,
+				Topic:        "abcd",
+				WriteTimeout: time.Second,
+				SASLUsername: "user",
+				SASLPassword: flagext.SecretWithValue("wrong wrong wrong"),
+			},
+			wantErr: true,
+		},
+		{
+			name: "wrong username",
+			config: kafka.Config{
+				Address:      addr,
+				Topic:        "abcd",
+				WriteTimeout: time.Second,
+				SASLUsername: "wrong wrong wrong",
+				SASLPassword: flagext.SecretWithValue("password"),
+			},
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			client, err := NewWriterClient(tt.config, 10, nil, nil)
+			require.NoError(t, err)
+
+			err = client.Ping(context.Background())
+			if tt.wantErr {
+				require.Error(t, err)
+			} else {
+				require.NoError(t, err)
+			}
+		})
+	}
+}
diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go
index 13cfb618cfdb9..09008bec93411 100644
--- a/pkg/kafka/config.go
+++ b/pkg/kafka/config.go
@@ -1,7 +1,6 @@
 package kafka
 
 import (
-	"context"
 	"errors"
 	"flag"
 	"fmt"
@@ -9,10 +8,7 @@ import (
 	"strings"
 	"time"
 
-	"github.com/go-kit/log"
-	"github.com/go-kit/log/level"
-	"github.com/twmb/franz-go/pkg/kadm"
-	"github.com/twmb/franz-go/pkg/kgo"
+	"github.com/grafana/dskit/flagext"
 )
 
 const (
@@ -21,29 +17,24 @@ const (
 	consumeFromEnd        = "end"
 	consumeFromTimestamp  = "timestamp"
 
-	// writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout.
-	// You can think about this overhead as an extra time for requests sitting in the client's buffer
-	// before being sent on the wire and the actual time it takes to send it over the network and
-	// start being processed by Kafka.
-	writerRequestTimeoutOverhead = 2 * time.Second
-
-	// producerBatchMaxBytes is the max allowed size of a batch of Kafka records.
-	producerBatchMaxBytes = 16_000_000
+	// ProducerBatchMaxBytes is the max allowed size of a batch of Kafka records.
+	ProducerBatchMaxBytes = 16_000_000
 
 	// maxProducerRecordDataBytesLimit is the max allowed size of a single record data. Given we have a limit
-	// on the max batch size (producerBatchMaxBytes), a Kafka record data can't be bigger than the batch size
+	// on the max batch size (ProducerBatchMaxBytes), a Kafka record data can't be bigger than the batch size
 	// minus some overhead required to serialise the batch and the record itself. We use 16KB as such overhead
 	// in the worst case scenario, which is expected to be way above the actual one.
-	maxProducerRecordDataBytesLimit = producerBatchMaxBytes - 16384
+	maxProducerRecordDataBytesLimit = ProducerBatchMaxBytes - 16384
 	minProducerRecordDataBytesLimit = 1024 * 1024
 )
 
 var (
-	ErrMissingKafkaAddress               = errors.New("the Kafka address has not been configured")
-	ErrMissingKafkaTopic                 = errors.New("the Kafka topic has not been configured")
-	ErrInconsistentConsumerLagAtStartup  = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0")
-	ErrInvalidMaxConsumerLagAtStartup    = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag")
-	ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
+	ErrMissingKafkaAddress                 = errors.New("the Kafka address has not been configured")
+	ErrMissingKafkaTopic                   = errors.New("the Kafka topic has not been configured")
+	ErrInconsistentConsumerLagAtStartup    = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0")
+	ErrInvalidMaxConsumerLagAtStartup      = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag")
+	ErrInconsistentSASLUsernameAndPassword = errors.New("both sasl username and password must be set")
+	ErrInvalidProducerMaxRecordSizeBytes   = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
 )
 
 // Config holds the generic config for the Kafka backend.
@@ -54,6 +45,9 @@ type Config struct {
 	DialTimeout  time.Duration `yaml:"dial_timeout"`
 	WriteTimeout time.Duration `yaml:"write_timeout"`
 
+	SASLUsername string         `yaml:"sasl_username"`
+	SASLPassword flagext.Secret `yaml:"sasl_password"`
+
 	ConsumerGroup                     string        `yaml:"consumer_group"`
 	ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"`
 
@@ -80,6 +74,9 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
 	f.DurationVar(&cfg.DialTimeout, prefix+".dial-timeout", 2*time.Second, "The maximum time allowed to open a connection to a Kafka broker.")
 	f.DurationVar(&cfg.WriteTimeout, prefix+".write-timeout", 10*time.Second, "How long to wait for an incoming write request to be successfully committed to the Kafka backend.")
 
+	f.StringVar(&cfg.SASLUsername, prefix+".sasl-username", "", "The SASL username for authentication to Kafka using the PLAIN mechanism. Both username and password must be set.")
+	f.Var(&cfg.SASLPassword, prefix+".sasl-password", "The SASL password for authentication to Kafka using the PLAIN mechanism. Both username and password must be set.")
+
 	f.StringVar(&cfg.ConsumerGroup, prefix+".consumer-group", "", "The consumer group used by the consumer to track the last consumed offset. The consumer group must be different for each ingester. If the configured consumer group contains the '<partition>' placeholder, it is replaced with the actual partition ID owned by the ingester. When empty (recommended), Mimir uses the ingester instance ID to guarantee uniqueness.")
 	f.DurationVar(&cfg.ConsumerGroupOffsetCommitInterval, prefix+".consumer-group-offset-commit-interval", time.Second, "How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left.")
 
@@ -113,6 +110,10 @@ func (cfg *Config) Validate() error {
 		return ErrInvalidMaxConsumerLagAtStartup
 	}
 
+	if (cfg.SASLUsername == "") != (cfg.SASLPassword.String() == "") {
+		return ErrInconsistentSASLUsernameAndPassword
+	}
+
 	return nil
 }
 
@@ -124,35 +125,3 @@ func (cfg *Config) GetConsumerGroup(instanceID string, partitionID int32) string
 
 	return strings.ReplaceAll(cfg.ConsumerGroup, "<partition>", strconv.Itoa(int(partitionID)))
 }
-
-// SetDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers.
-// This is best-effort, if setting the option fails, error is logged, but not returned.
-func (cfg Config) SetDefaultNumberOfPartitionsForAutocreatedTopics(logger log.Logger) {
-	if cfg.AutoCreateTopicDefaultPartitions <= 0 {
-		return
-	}
-
-	cl, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, logger)...)
-	if err != nil {
-		level.Error(logger).Log("msg", "failed to create kafka client", "err", err)
-		return
-	}
-
-	adm := kadm.NewClient(cl)
-	defer adm.Close()
-
-	defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions)
-	_, err = adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{
-		{
-			Op:    kadm.SetConfig,
-			Name:  "num.partitions",
-			Value: &defaultNumberOfPartitions,
-		},
-	})
-	if err != nil {
-		level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err)
-		return
-	}
-
-	level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions)
-}
diff --git a/pkg/kafka/config_test.go b/pkg/kafka/config_test.go
index 7c21e38fd141e..87c456f42adc0 100644
--- a/pkg/kafka/config_test.go
+++ b/pkg/kafka/config_test.go
@@ -3,39 +3,37 @@ package kafka
 import (
 	"testing"
 
-	"github.com/go-kit/log"
+	"github.com/grafana/dskit/flagext"
 	"github.com/stretchr/testify/require"
-	"github.com/twmb/franz-go/pkg/kfake"
-	"github.com/twmb/franz-go/pkg/kmsg"
 )
 
-func TestSetDefaultNumberOfPartitionsForAutocreatedTopics(t *testing.T) {
-	cluster, err := kfake.NewCluster(kfake.NumBrokers(1))
-	require.NoError(t, err)
-	t.Cleanup(cluster.Close)
-
-	addrs := cluster.ListenAddrs()
-	require.Len(t, addrs, 1)
-
+func TestBothSASLParamsMustBeSet(t *testing.T) {
 	cfg := Config{
-		Address:                          addrs[0],
-		AutoCreateTopicDefaultPartitions: 100,
+		// Other required params
+		Address:                    "abcd",
+		Topic:                      "abcd",
+		ProducerMaxRecordSizeBytes: 1048576,
 	}
 
-	cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) {
-		r := request.(*kmsg.AlterConfigsRequest)
-
-		require.Len(t, r.Resources, 1)
-		res := r.Resources[0]
-		require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType)
-		require.Len(t, res.Configs, 1)
-		cfg := res.Configs[0]
-		require.Equal(t, "num.partitions", cfg.Name)
-		require.NotNil(t, *cfg.Value)
-		require.Equal(t, "100", *cfg.Value)
-
-		return &kmsg.AlterConfigsResponse{}, nil, true
-	})
+	// No SASL params is valid
+	err := cfg.Validate()
+	require.NoError(t, err)
 
-	cfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(log.NewNopLogger())
+	// Just username is invalid
+	cfg.SASLUsername = "abcd"
+	cfg.SASLPassword = flagext.Secret{}
+	err = cfg.Validate()
+	require.Error(t, err)
+
+	// Just password is invalid
+	cfg.SASLUsername = ""
+	cfg.SASLPassword = flagext.SecretWithValue("abcd")
+	err = cfg.Validate()
+	require.Error(t, err)
+
+	// Both username and password is valid
+	cfg.SASLUsername = "abcd"
+	cfg.SASLPassword = flagext.SecretWithValue("abcd")
+	err = cfg.Validate()
+	require.NoError(t, err)
 }
diff --git a/pkg/kafka/partition/committer_test.go b/pkg/kafka/partition/committer_test.go
index 9ef02f910e5d0..1739986cd66c8 100644
--- a/pkg/kafka/partition/committer_test.go
+++ b/pkg/kafka/partition/committer_test.go
@@ -14,7 +14,7 @@ import (
 
 	"github.com/prometheus/client_golang/prometheus/testutil"
 
-	"github.com/grafana/loki/v3/pkg/kafka"
+	"github.com/grafana/loki/v3/pkg/kafka/client"
 	"github.com/grafana/loki/v3/pkg/kafka/testkafka"
 )
 
@@ -24,7 +24,7 @@ func TestPartitionCommitter(t *testing.T) {
 	topicName := "test-topic"
 	_, kafkaCfg := testkafka.CreateCluster(t, numPartitions, topicName)
 
-	client, err := kafka.NewReaderClient(kafkaCfg, kprom.NewMetrics("foo"), log.NewNopLogger())
+	client, err := client.NewReaderClient(kafkaCfg, kprom.NewMetrics("foo"), log.NewNopLogger())
 	require.NoError(t, err)
 
 	// Create a Kafka admin client
diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go
index 74f18b02057f3..a6038962222cb 100644
--- a/pkg/kafka/partition/reader.go
+++ b/pkg/kafka/partition/reader.go
@@ -22,6 +22,7 @@ import (
 	"github.com/twmb/franz-go/plugin/kprom"
 
 	"github.com/grafana/loki/v3/pkg/kafka"
+	"github.com/grafana/loki/v3/pkg/kafka/client"
 )
 
 var errWaitTargetLagDeadlineExceeded = errors.New("waiting for target lag deadline exceeded")
@@ -94,7 +95,7 @@ func NewReader(
 // This method is called when the PartitionReader service starts.
 func (p *Reader) start(ctx context.Context) error {
 	var err error
-	p.client, err = kafka.NewReaderClient(p.kafkaCfg, p.metrics.kprom, p.logger)
+	p.client, err = client.NewReaderClient(p.kafkaCfg, p.metrics.kprom, p.logger)
 	if err != nil {
 		return errors.Wrap(err, "creating kafka reader client")
 	}
@@ -535,7 +536,7 @@ func newReaderMetrics(reg prometheus.Registerer) readerMetrics {
 	return readerMetrics{
 		receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"),
 		receiveDelayWhenRunning:  receiveDelay.WithLabelValues("running"),
-		kprom:                    kafka.NewReaderClientMetrics("partition-reader", reg),
+		kprom:                    client.NewReaderClientMetrics("partition-reader", reg),
 		fetchWaitDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
 			Name:                        "loki_ingest_storage_reader_records_batch_wait_duration_seconds",
 			Help:                        "How long a consumer spent waiting for a batch of records from the Kafka client. If fetching is faster than processing, then this will be close to 0.",
diff --git a/pkg/kafka/partition/reader_test.go b/pkg/kafka/partition/reader_test.go
index 8d548c8312411..dfd653de78e3d 100644
--- a/pkg/kafka/partition/reader_test.go
+++ b/pkg/kafka/partition/reader_test.go
@@ -17,6 +17,7 @@ import (
 	"github.com/twmb/franz-go/pkg/kgo"
 
 	"github.com/grafana/loki/v3/pkg/kafka"
+	"github.com/grafana/loki/v3/pkg/kafka/client"
 	"github.com/grafana/loki/v3/pkg/kafka/testkafka"
 	"github.com/grafana/loki/v3/pkg/logproto"
 )
@@ -58,7 +59,7 @@ func (m *mockConsumer) Flush(ctx context.Context) error {
 }
 
 func TestPartitionReader_BasicFunctionality(t *testing.T) {
-	_, kafkaCfg := testkafka.CreateCluster(t, 1, "test-topic")
+	_, kafkaCfg := testkafka.CreateCluster(t, 1, "test")
 	consumer := newMockConsumer()
 
 	consumerFactory := func(_ Committer) (Consumer, error) {
@@ -67,7 +68,7 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) {
 
 	partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
 	require.NoError(t, err)
-	producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
+	producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
 	require.NoError(t, err)
 
 	err = services.StartAndAwaitRunning(context.Background(), partitionReader)
@@ -82,8 +83,8 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) {
 	require.NoError(t, err)
 	require.Len(t, records, 1)
 
-	producer.ProduceSync(context.Background(), records...)
-	producer.ProduceSync(context.Background(), records...)
+	require.NoError(t, producer.ProduceSync(context.Background(), records...).FirstErr())
+	require.NoError(t, producer.ProduceSync(context.Background(), records...).FirstErr())
 
 	// Wait for records to be processed
 	assert.Eventually(t, func() bool {
@@ -121,7 +122,7 @@ func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) {
 
 	partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
 	require.NoError(t, err)
-	producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
+	producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
 	require.NoError(t, err)
 
 	stream := logproto.Stream{
@@ -175,11 +176,11 @@ func TestPartitionReader_ProcessCommits(t *testing.T) {
 	partitionID := int32(0)
 	partitionReader, err := NewReader(kafkaCfg, partitionID, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
 	require.NoError(t, err)
-	producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
+	producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
 	require.NoError(t, err)
 
 	// Init the client: This usually happens in "start" but we want to manage our own lifecycle for this test.
-	partitionReader.client, err = kafka.NewReaderClient(kafkaCfg, nil, log.NewNopLogger(),
+	partitionReader.client, err = client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger(),
 		kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{
 			kafkaCfg.Topic: {partitionID: kgo.NewOffset().AtStart()},
 		}),
diff --git a/pkg/kafka/testkafka/cluster.go b/pkg/kafka/testkafka/cluster.go
index cc5847c2bfd35..c70e3da4a71cb 100644
--- a/pkg/kafka/testkafka/cluster.go
+++ b/pkg/kafka/testkafka/cluster.go
@@ -16,8 +16,8 @@ import (
 )
 
 // CreateCluster returns a fake Kafka cluster for unit testing.
-func CreateCluster(t testing.TB, numPartitions int32, topicName string) (*kfake.Cluster, kafka.Config) {
-	cluster, addr := CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, topicName)
+func CreateCluster(t testing.TB, numPartitions int32, topicName string, opts ...kfake.Opt) (*kfake.Cluster, kafka.Config) {
+	cluster, addr := CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, topicName, opts...)
 	addSupportForConsumerGroups(t, cluster, topicName, numPartitions)
 
 	return cluster, createTestKafkaConfig(addr, topicName)
@@ -34,8 +34,16 @@ func createTestKafkaConfig(clusterAddr, topicName string) kafka.Config {
 	return cfg
 }
 
-func CreateClusterWithoutCustomConsumerGroupsSupport(t testing.TB, numPartitions int32, topicName string) (*kfake.Cluster, string) {
-	cluster, err := kfake.NewCluster(kfake.NumBrokers(1), kfake.SeedTopics(numPartitions, topicName))
+func CreateClusterWithoutCustomConsumerGroupsSupport(t testing.TB, numPartitions int32, topicName string, opts ...kfake.Opt) (*kfake.Cluster, string) {
+	cfg := []kfake.Opt{
+		kfake.NumBrokers(1),
+		kfake.SeedTopics(numPartitions, topicName),
+	}
+
+	// Apply options.
+	cfg = append(cfg, opts...)
+
+	cluster, err := kfake.NewCluster(cfg...)
 	require.NoError(t, err)
 	t.Cleanup(cluster.Close)
 
diff --git a/vendor/github.com/twmb/franz-go/pkg/sasl/plain/plain.go b/vendor/github.com/twmb/franz-go/pkg/sasl/plain/plain.go
new file mode 100644
index 0000000000000..97a9369d13723
--- /dev/null
+++ b/vendor/github.com/twmb/franz-go/pkg/sasl/plain/plain.go
@@ -0,0 +1,60 @@
+// Package plain provides PLAIN sasl authentication as specified in RFC4616.
+package plain
+
+import (
+	"context"
+	"errors"
+
+	"github.com/twmb/franz-go/pkg/sasl"
+)
+
+// Auth contains information for authentication.
+type Auth struct {
+	// Zid is an optional authorization ID to use in authenticating.
+	Zid string
+
+	// User is username to use for authentication.
+	User string
+
+	// Pass is the password to use for authentication.
+	Pass string
+
+	_ struct{} // require explicit field initialization
+}
+
+// AsMechanism returns a sasl mechanism that will use 'a' as credentials for
+// all sasl sessions.
+//
+// This is a shortcut for using the Plain function and is useful when you do
+// not need to live-rotate credentials.
+func (a Auth) AsMechanism() sasl.Mechanism {
+	return Plain(func(context.Context) (Auth, error) {
+		return a, nil
+	})
+}
+
+// Plain returns a sasl mechanism that will call authFn whenever sasl
+// authentication is needed. The returned Auth is used for a single session.
+func Plain(authFn func(context.Context) (Auth, error)) sasl.Mechanism {
+	return plain(authFn)
+}
+
+type plain func(context.Context) (Auth, error)
+
+func (plain) Name() string { return "PLAIN" }
+func (fn plain) Authenticate(ctx context.Context, _ string) (sasl.Session, []byte, error) {
+	auth, err := fn(ctx)
+	if err != nil {
+		return nil, nil, err
+	}
+	if auth.User == "" || auth.Pass == "" {
+		return nil, nil, errors.New("PLAIN user and pass must be non-empty")
+	}
+	return session{}, []byte(auth.Zid + "\x00" + auth.User + "\x00" + auth.Pass), nil
+}
+
+type session struct{}
+
+func (session) Challenge([]byte) (bool, []byte, error) {
+	return true, nil, nil
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 49e7bb611899f..8e8e074487f93 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1602,6 +1602,7 @@ github.com/twmb/franz-go/pkg/kgo
 github.com/twmb/franz-go/pkg/kgo/internal/sticky
 github.com/twmb/franz-go/pkg/kversion
 github.com/twmb/franz-go/pkg/sasl
+github.com/twmb/franz-go/pkg/sasl/plain
 # github.com/twmb/franz-go/pkg/kadm v1.13.0
 ## explicit; go 1.21
 github.com/twmb/franz-go/pkg/kadm