diff --git a/go.mod b/go.mod index 1cedd9e56cbb..44a8e02791cb 100644 --- a/go.mod +++ b/go.mod @@ -144,6 +144,7 @@ require ( github.com/stretchr/testify v1.6.1 github.com/twpayne/go-geom v1.3.6 github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/zabawaba99/go-gitignore v0.0.0-20200117185801-39e6bddfb292 go.etcd.io/etcd/raft/v3 v3.0.0-20210215124703-719f6ce06fbc golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad diff --git a/go.sum b/go.sum index 6d9cece01f4a..367d59157bc3 100644 --- a/go.sum +++ b/go.sum @@ -741,7 +741,9 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 53ad9c2466c8..469a0f749110 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "metrics.go", "name.go", "rowfetcher_cache.go", + "scram_client.go", "sink.go", "sink_cloudstorage.go", "testing_knobs.go", @@ -92,6 +93,7 @@ go_library( "@com_github_google_btree//:btree", "@com_github_linkedin_goavro_v2//:goavro", "@com_github_shopify_sarama//:sarama", + "@com_github_xdg_scram//:scram", ], ) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 23e3ac6dbe53..d57759f36e8e 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -2081,6 +2081,14 @@ func TestChangefeedErrors(t *testing.T) { t, `sasl_enabled must be enabled if a SASL password is provided`, `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_password=a`, ) + sqlDB.ExpectErr( + t, `sasl_enabled must be enabled to configure SASL mechanism`, + `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_mechanism=SCRAM-SHA-256`, + ) + sqlDB.ExpectErr( + t, `param sasl_mechanism must be one of SCRAM-SHA-256, SCRAM-SHA-512, or PLAIN`, + `CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?sasl_enabled=true&sasl_mechanism=unsuppported`, + ) // The avro format doesn't support key_in_value yet. sqlDB.ExpectErr( diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 869ef118477a..cdaaa832996c 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -93,6 +93,7 @@ const ( SinkParamSASLHandshake = `sasl_handshake` SinkParamSASLUser = `sasl_user` SinkParamSASLPassword = `sasl_password` + SinkParamSASLMechanism = `sasl_mechanism` ) // ChangefeedOptionExpectValues is used to parse changefeed options using diff --git a/pkg/ccl/changefeedccl/scram_client.go b/pkg/ccl/changefeedccl/scram_client.go new file mode 100644 index 000000000000..6d2f5b7e044e --- /dev/null +++ b/pkg/ccl/changefeedccl/scram_client.go @@ -0,0 +1,61 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "crypto/sha256" + "crypto/sha512" + + "github.com/Shopify/sarama" + "github.com/xdg/scram" +) + +var ( + // sha256ClientGenerator returns a SCRAMClient for the + // SCRAM-SHA-256 SASL mechanism. This can used as a + // SCRAMCLientGeneratorFunc when constructing a sarama SASL + // configuration. + sha256ClientGenerator = func() sarama.SCRAMClient { + return &scramClient{HashGeneratorFcn: sha256.New} + } + + // sha512ClientGenerator returns a SCRAMClient for the + // SCRAM-SHA-512 SASL mechanism. This can used as a + // SCRAMCLientGeneratorFunc when constructing a sarama SASL + // configuration. + sha512ClientGenerator = func() sarama.SCRAMClient { + return &scramClient{HashGeneratorFcn: sha512.New} + } +) + +type scramClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +var _ sarama.SCRAMClient = &scramClient{} + +func (c *scramClient) Begin(userName, password, authzID string) error { + var err error + c.Client, err = c.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + c.ClientConversation = c.Client.NewConversation() + return nil +} + +func (c *scramClient) Step(challenge string) (string, error) { + return c.ClientConversation.Step(challenge) +} + +func (c *scramClient) Done() bool { + return c.ClientConversation.Done() +} diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 6db9adfcca1f..90d97f52d252 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -157,6 +157,23 @@ func getSink( } cfg.saslHandshake = b } + + cfg.saslMechanism = q.Get(changefeedbase.SinkParamSASLMechanism) + q.Del(changefeedbase.SinkParamSASLMechanism) + if cfg.saslMechanism != `` && !cfg.saslEnabled { + return nil, errors.Errorf(`%s must be enabled to configure SASL mechanism`, changefeedbase.SinkParamSASLEnabled) + } + if cfg.saslMechanism == `` { + cfg.saslMechanism = sarama.SASLTypePlaintext + } + switch cfg.saslMechanism { + case sarama.SASLTypeSCRAMSHA256, sarama.SASLTypeSCRAMSHA512, sarama.SASLTypePlaintext: + default: + return nil, errors.Errorf(`param %s must be one of %s, %s, or %s`, + changefeedbase.SinkParamSASLMechanism, + sarama.SASLTypeSCRAMSHA256, sarama.SASLTypeSCRAMSHA512, sarama.SASLTypePlaintext) + } + cfg.saslUser = q.Get(changefeedbase.SinkParamSASLUser) q.Del(changefeedbase.SinkParamSASLUser) cfg.saslPassword = q.Get(changefeedbase.SinkParamSASLPassword) @@ -305,6 +322,7 @@ type kafkaSinkConfig struct { saslHandshake bool saslUser string saslPassword string + saslMechanism string targetNames map[descpb.ID]string } @@ -393,6 +411,13 @@ func makeKafkaSink( config.Net.SASL.Handshake = cfg.saslHandshake config.Net.SASL.User = cfg.saslUser config.Net.SASL.Password = cfg.saslPassword + config.Net.SASL.Mechanism = sarama.SASLMechanism(cfg.saslMechanism) + switch config.Net.SASL.Mechanism { + case sarama.SASLTypeSCRAMSHA512: + config.Net.SASL.SCRAMClientGeneratorFunc = sha512ClientGenerator + case sarama.SASLTypeSCRAMSHA256: + config.Net.SASL.SCRAMClientGeneratorFunc = sha256ClientGenerator + } } // When we emit messages to sarama, they're placed in a queue (as does any diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 9d331a742270..926a2affa54a 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -506,6 +506,7 @@ func runCDCKafkaAuth(ctx context.Context, t *test, c *cluster) { kafka.install(ctx) testCerts := kafka.configureAuth(ctx) kafka.start(ctx, "kafka") + kafka.addSCRAMUsers(ctx) defer kafka.stop(ctx) db := c.Conn(ctx, 1) @@ -530,26 +531,45 @@ func runCDCKafkaAuth(ctx context.Context, t *test, c *cluster) { t.Fatal(err) } - var jobID int - if err := db.QueryRow( - `CREATE CHANGEFEED FOR auth_test_table INTO $1`, - fmt.Sprintf("%s?tls_enabled=true&insecure_tls_skip_verify=true", kafka.sinkURLTLS(ctx)), - ).Scan(&jobID); err != nil { - t.Fatalf("create changefeed with insecure TLS transport: %s", err.Error()) - } - - if err := db.QueryRow( - `CREATE CHANGEFEED FOR auth_test_table INTO $1`, - fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s", kafka.sinkURLTLS(ctx), testCerts.CACertBase64()), - ).Scan(&jobID); err != nil { - t.Fatalf("create changefeed with TLS transport: %s", err.Error()) + caCert := testCerts.CACertBase64() + saslURL := kafka.sinkURLSASL(ctx) + feeds := []struct { + desc string + queryArg string + }{ + { + "create changefeed with insecure TLS transport and no auth", + fmt.Sprintf("%s?tls_enabled=true&insecure_tls_skip_verify=true", kafka.sinkURLTLS(ctx)), + }, + { + "create changefeed with TLS transport and no auth", + fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s", kafka.sinkURLTLS(ctx), testCerts.CACertBase64()), + }, + { + "create changefeed with TLS transport and SASL/PLAIN (default mechanism)", + fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s&sasl_enabled=true&sasl_user=plain&sasl_password=plain-secret", saslURL, caCert), + }, + { + "create changefeed with TLS transport and SASL/PLAIN (explicit mechanism)", + fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s&sasl_enabled=true&sasl_user=plain&sasl_password=plain-secret&sasl_mechanism=PLAIN", saslURL, caCert), + }, + { + "create changefeed with TLS transport and SASL/SCRAM-SHA-256", + fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s&sasl_enabled=true&sasl_user=scram256&sasl_password=scram256-secret&sasl_mechanism=SCRAM-SHA-256", saslURL, caCert), + }, + { + "create changefeed with TLS transport and SASL/SCRAM-SHA-512", + fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s&sasl_enabled=true&sasl_user=scram512&sasl_password=scram512-secret&sasl_mechanism=SCRAM-SHA-512", saslURL, caCert), + }, } - if err := db.QueryRow( - `CREATE CHANGEFEED FOR auth_test_table INTO $1`, - fmt.Sprintf("%s?tls_enabled=true&ca_cert=%s&sasl_enabled=true&sasl_user=plain&sasl_password=plain-secret", kafka.sinkURLSASL(ctx), testCerts.CACertBase64()), - ).Scan(&jobID); err != nil { - t.Fatalf("create changefeed with TLS transport and SASL/PLAIN auth: %s", err.Error()) + var jobID int + for _, f := range feeds { + t.Status(f.desc) + row := db.QueryRow(`CREATE CHANGEFEED FOR auth_test_table INTO $1`, f.queryArg) + if err := row.Scan(&jobID); err != nil { + t.Fatalf("%s: %s", f.desc, err.Error()) + } } } @@ -896,6 +916,10 @@ tar xvf /tmp/confluent.tar.gz -C "$CONFLUENT_DIR" // user called "plain" with password "plain-secret" that can // authenticate via SASL/PLAIN. // + // Users to test SCRAM authentication are added via + // kafka-config commands as their credentials are stored in + // zookeeper. + // // Newer versions of confluent configure this directly in // server.properties. // @@ -909,6 +933,10 @@ KafkaServer { password="admin-secret" user_admin="admin-secret" user_plain="plain-secret"; + + org.apache.kafka.common.security.scram.ScramLoginModule required + username="admin" + password="admin-secret"; }; ` @@ -969,6 +997,10 @@ func (k kafkaManager) configDir() string { return k.basePath() + `/confluent-4.0.0/etc/kafka/` } +func (k kafkaManager) binDir() string { + return k.basePath() + `/confluent-4.0.0/bin/` +} + func (k kafkaManager) serverJAASConfig() string { return k.configDir() + `server_jaas.conf` } @@ -1075,6 +1107,23 @@ func (k kafkaManager) PutConfigContent(ctx context.Context, data string, path st } } +func (k kafkaManager) addSCRAMUsers(ctx context.Context) { + k.c.status("adding entries for SASL/SCRAM users") + k.c.Run(ctx, k.nodes, filepath.Join(k.binDir(), "kafka-configs"), + "--zookeeper", "localhost:2181", + "--alter", + "--add-config", "SCRAM-SHA-512=[password=scram512-secret]", + "--entity-type", "users", + "--entity-name", "scram512") + + k.c.Run(ctx, k.nodes, filepath.Join(k.binDir(), "kafka-configs"), + "--zookeeper", "localhost:2181", + "--alter", + "--add-config", "SCRAM-SHA-256=[password=scram256-secret]", + "--entity-type", "users", + "--entity-name", "scram256") +} + func (k kafkaManager) start(ctx context.Context, services ...string) { folder := k.basePath() // This isn't necessary for the nightly tests, but it's nice for iteration. diff --git a/vendor b/vendor index 310ea3cd09b5..7992c1461f16 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 310ea3cd09b55055b290f0a4cf9991a91725797e +Subproject commit 7992c1461f16bb4439700e13282bc1bb3bb30b55