Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ccl: add support for additional sasl mechanisms (SCRAM-SHA-256 and SCRAM-SHA-512) #60150

Merged
merged 2 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/twpayne/go-geom v1.3.6
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/zabawaba99/go-gitignore v0.0.0-20200117185801-39e6bddfb292
go.etcd.io/etcd/raft/v3 v3.0.0-20210215124703-719f6ce06fbc
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,9 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"metrics.go",
"name.go",
"rowfetcher_cache.go",
"scram_client.go",
"sink.go",
"sink_cloudstorage.go",
"testing_knobs.go",
Expand Down Expand Up @@ -92,6 +93,7 @@ go_library(
"@com_github_google_btree//:btree",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_shopify_sarama//:sarama",
"@com_github_xdg_scram//:scram",
],
)

Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,14 @@ func TestChangefeedErrors(t *testing.T) {
t, `sasl_enabled must be enabled if a SASL password is provided`,
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_password=a`,
)
sqlDB.ExpectErr(
t, `sasl_enabled must be enabled to configure SASL mechanism`,
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_mechanism=SCRAM-SHA-256`,
)
sqlDB.ExpectErr(
t, `param sasl_mechanism must be one of SCRAM-SHA-256, SCRAM-SHA-512, or PLAIN`,
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_enabled=true&sasl_mechanism=unsuppported`,
)

// The avro format doesn't support key_in_value yet.
sqlDB.ExpectErr(
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ const (
SinkParamSASLHandshake = `sasl_handshake`
SinkParamSASLUser = `sasl_user`
SinkParamSASLPassword = `sasl_password`
SinkParamSASLMechanism = `sasl_mechanism`
)

// ChangefeedOptionExpectValues is used to parse changefeed options using
Expand Down
61 changes: 61 additions & 0 deletions pkg/ccl/changefeedccl/scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"crypto/sha256"
"crypto/sha512"

"github.com/Shopify/sarama"
"github.com/xdg/scram"
)

var (
// sha256ClientGenerator returns a SCRAMClient for the
// SCRAM-SHA-256 SASL mechanism. This can used as a
// SCRAMCLientGeneratorFunc when constructing a sarama SASL
// configuration.
sha256ClientGenerator = func() sarama.SCRAMClient {
return &scramClient{HashGeneratorFcn: sha256.New}
}

// sha512ClientGenerator returns a SCRAMClient for the
// SCRAM-SHA-512 SASL mechanism. This can used as a
// SCRAMCLientGeneratorFunc when constructing a sarama SASL
// configuration.
sha512ClientGenerator = func() sarama.SCRAMClient {
return &scramClient{HashGeneratorFcn: sha512.New}
}
)

type scramClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

var _ sarama.SCRAMClient = &scramClient{}

func (c *scramClient) Begin(userName, password, authzID string) error {
var err error
c.Client, err = c.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
c.ClientConversation = c.Client.NewConversation()
return nil
}

func (c *scramClient) Step(challenge string) (string, error) {
return c.ClientConversation.Step(challenge)
}

func (c *scramClient) Done() bool {
return c.ClientConversation.Done()
}
25 changes: 25 additions & 0 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,23 @@ func getSink(
}
cfg.saslHandshake = b
}

cfg.saslMechanism = q.Get(changefeedbase.SinkParamSASLMechanism)
q.Del(changefeedbase.SinkParamSASLMechanism)
if cfg.saslMechanism != `` && !cfg.saslEnabled {
return nil, errors.Errorf(`%s must be enabled to configure SASL mechanism`, changefeedbase.SinkParamSASLEnabled)
}
if cfg.saslMechanism == `` {
cfg.saslMechanism = sarama.SASLTypePlaintext
}
switch cfg.saslMechanism {
case sarama.SASLTypeSCRAMSHA256, sarama.SASLTypeSCRAMSHA512, sarama.SASLTypePlaintext:
default:
return nil, errors.Errorf(`param %s must be one of %s, %s, or %s`,
changefeedbase.SinkParamSASLMechanism,
sarama.SASLTypeSCRAMSHA256, sarama.SASLTypeSCRAMSHA512, sarama.SASLTypePlaintext)
}

cfg.saslUser = q.Get(changefeedbase.SinkParamSASLUser)
q.Del(changefeedbase.SinkParamSASLUser)
cfg.saslPassword = q.Get(changefeedbase.SinkParamSASLPassword)
Expand Down Expand Up @@ -305,6 +322,7 @@ type kafkaSinkConfig struct {
saslHandshake bool
saslUser string
saslPassword string
saslMechanism string
targetNames map[descpb.ID]string
}

Expand Down Expand Up @@ -393,6 +411,13 @@ func makeKafkaSink(
config.Net.SASL.Handshake = cfg.saslHandshake
config.Net.SASL.User = cfg.saslUser
config.Net.SASL.Password = cfg.saslPassword
config.Net.SASL.Mechanism = sarama.SASLMechanism(cfg.saslMechanism)
switch config.Net.SASL.Mechanism {
case sarama.SASLTypeSCRAMSHA512:
config.Net.SASL.SCRAMClientGeneratorFunc = sha512ClientGenerator
case sarama.SASLTypeSCRAMSHA256:
config.Net.SASL.SCRAMClientGeneratorFunc = sha256ClientGenerator
}
}

// When we emit messages to sarama, they're placed in a queue (as does any
Expand Down
85 changes: 67 additions & 18 deletions pkg/cmd/roachtest/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ func runCDCKafkaAuth(ctx context.Context, t *test, c *cluster) {
kafka.install(ctx)
testCerts := kafka.configureAuth(ctx)
kafka.start(ctx, "kafka")
kafka.addSCRAMUsers(ctx)
defer kafka.stop(ctx)

db := c.Conn(ctx, 1)
Expand All @@ -530,26 +531,45 @@ func runCDCKafkaAuth(ctx context.Context, t *test, c *cluster) {
t.Fatal(err)
}

var jobID int
if err := db.QueryRow(
`CREATE CHANGEFEED FOR auth_test_table INTO $1`,
fmt.Sprintf("%s?tls_enabled=true&insecure_tls_skip_verify=true", kafka.sinkURLTLS(ctx)),
).Scan(&jobID); err != nil {
t.Fatalf("create changefeed with insecure TLS transport: %s", err.Error())
}

if err := db.QueryRow(
`CREATE CHANGEFEED FOR auth_test_table INTO $1`,
fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s", kafka.sinkURLTLS(ctx), testCerts.CACertBase64()),
).Scan(&jobID); err != nil {
t.Fatalf("create changefeed with TLS transport: %s", err.Error())
caCert := testCerts.CACertBase64()
saslURL := kafka.sinkURLSASL(ctx)
feeds := []struct {
desc string
queryArg string
}{
{
"create changefeed with insecure TLS transport and no auth",
fmt.Sprintf("%s?tls_enabled=true&insecure_tls_skip_verify=true", kafka.sinkURLTLS(ctx)),
},
{
"create changefeed with TLS transport and no auth",
fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s", kafka.sinkURLTLS(ctx), testCerts.CACertBase64()),
},
{
"create changefeed with TLS transport and SASL/PLAIN (default mechanism)",
fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s&sasl_enabled=true&sasl_user=plain&sasl_password=plain-secret", saslURL, caCert),
},
{
"create changefeed with TLS transport and SASL/PLAIN (explicit mechanism)",
fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s&sasl_enabled=true&sasl_user=plain&sasl_password=plain-secret&sasl_mechanism=PLAIN", saslURL, caCert),
},
{
"create changefeed with TLS transport and SASL/SCRAM-SHA-256",
fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s&sasl_enabled=true&sasl_user=scram256&sasl_password=scram256-secret&sasl_mechanism=SCRAM-SHA-256", saslURL, caCert),
},
{
"create changefeed with TLS transport and SASL/SCRAM-SHA-512",
fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s&sasl_enabled=true&sasl_user=scram512&sasl_password=scram512-secret&sasl_mechanism=SCRAM-SHA-512", saslURL, caCert),
},
}

if err := db.QueryRow(
`CREATE CHANGEFEED FOR auth_test_table INTO $1`,
fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s&sasl_enabled=true&sasl_user=plain&sasl_password=plain-secret", kafka.sinkURLSASL(ctx), testCerts.CACertBase64()),
).Scan(&jobID); err != nil {
t.Fatalf("create changefeed with TLS transport and SASL/PLAIN auth: %s", err.Error())
var jobID int
for _, f := range feeds {
t.Status(f.desc)
row := db.QueryRow(`CREATE CHANGEFEED FOR auth_test_table INTO $1`, f.queryArg)
if err := row.Scan(&jobID); err != nil {
t.Fatalf("%s: %s", f.desc, err.Error())
}
}
}

Expand Down Expand Up @@ -896,6 +916,10 @@ tar xvf /tmp/confluent.tar.gz -C "$CONFLUENT_DIR"
// user called "plain" with password "plain-secret" that can
// authenticate via SASL/PLAIN.
//
// Users to test SCRAM authentication are added via
// kafka-config commands as their credentials are stored in
// zookeeper.
//
// Newer versions of confluent configure this directly in
// server.properties.
//
Expand All @@ -909,6 +933,10 @@ KafkaServer {
password="admin-secret"
user_admin="admin-secret"
user_plain="plain-secret";

org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
`

Expand Down Expand Up @@ -969,6 +997,10 @@ func (k kafkaManager) configDir() string {
return k.basePath() + `/confluent-4.0.0/etc/kafka/`
}

func (k kafkaManager) binDir() string {
return k.basePath() + `/confluent-4.0.0/bin/`
}

func (k kafkaManager) serverJAASConfig() string {
return k.configDir() + `server_jaas.conf`
}
Expand Down Expand Up @@ -1075,6 +1107,23 @@ func (k kafkaManager) PutConfigContent(ctx context.Context, data string, path st
}
}

func (k kafkaManager) addSCRAMUsers(ctx context.Context) {
k.c.status("adding entries for SASL/SCRAM users")
k.c.Run(ctx, k.nodes, filepath.Join(k.binDir(), "kafka-configs"),
"--zookeeper", "localhost:2181",
"--alter",
"--add-config", "SCRAM-SHA-512=[password=scram512-secret]",
"--entity-type", "users",
"--entity-name", "scram512")

k.c.Run(ctx, k.nodes, filepath.Join(k.binDir(), "kafka-configs"),
"--zookeeper", "localhost:2181",
"--alter",
"--add-config", "SCRAM-SHA-256=[password=scram256-secret]",
"--entity-type", "users",
"--entity-name", "scram256")
}

func (k kafkaManager) start(ctx context.Context, services ...string) {
folder := k.basePath()
// This isn't necessary for the nightly tests, but it's nice for iteration.
Expand Down