From e40e737b367f564a09f29d09683aa33df18176d8 Mon Sep 17 00:00:00 2001 From: Ryan Min Date: Sun, 27 Jun 2021 22:37:53 -0400 Subject: [PATCH] changefeedccl: webhook sink implementation Adds support for a new enterprise webhook-style HTTP sink that sends JSON POST payloads for every row and resolved timestamp emitted by the changefeed. Configurable with new options and query params as mentioned in the corresponding RFC. Resolves #65816 Release note (enterprise change): Introduces new webhook sink (prefix 'webhook-https') to send individual changefeed messages as webhook events. --- pkg/ccl/changefeedccl/BUILD.bazel | 2 + pkg/ccl/changefeedccl/bench_test.go | 2 +- pkg/ccl/changefeedccl/cdctest/BUILD.bazel | 1 + .../cdctest/mock_webhook_sink.go | 145 ++++++++++ pkg/ccl/changefeedccl/cdctest/testfeed.go | 2 +- pkg/ccl/changefeedccl/cdctest/tls_util.go | 37 +++ pkg/ccl/changefeedccl/changefeed_stmt.go | 28 +- pkg/ccl/changefeedccl/changefeed_test.go | 103 ++++++- .../changefeedccl/changefeedbase/options.go | 52 ++-- pkg/ccl/changefeedccl/encoder.go | 42 ++- pkg/ccl/changefeedccl/encoder_test.go | 42 +-- pkg/ccl/changefeedccl/helpers_test.go | 15 + pkg/ccl/changefeedccl/nemeses_test.go | 1 + .../show_changefeed_jobs_test.go | 52 +++- pkg/ccl/changefeedccl/sink.go | 40 ++- pkg/ccl/changefeedccl/sink_cloudstorage.go | 5 +- .../changefeedccl/sink_cloudstorage_test.go | 3 +- pkg/ccl/changefeedccl/sink_kafka.go | 39 +-- pkg/ccl/changefeedccl/sink_webhook.go | 262 +++++++++++++++++ pkg/ccl/changefeedccl/sink_webhook_test.go | 273 ++++++++++++++++++ pkg/ccl/changefeedccl/testfeed_test.go | 177 ++++++++++++ pkg/cmd/roachtest/BUILD.bazel | 1 + pkg/cmd/roachtest/cdc.go | 59 +++- 23 files changed, 1250 insertions(+), 133 deletions(-) create mode 100644 pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go create mode 100644 pkg/ccl/changefeedccl/sink_webhook.go create mode 100644 pkg/ccl/changefeedccl/sink_webhook_test.go diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index b8d7c958798c..db98dbf175a2 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "sink_cloudstorage.go", "sink_kafka.go", "sink_sql.go", + "sink_webhook.go", "testing_knobs.go", "tls.go", ], @@ -121,6 +122,7 @@ go_test( "show_changefeed_jobs_test.go", "sink_cloudstorage_test.go", "sink_test.go", + "sink_webhook_test.go", "testfeed_test.go", "validations_test.go", ], diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index 61b6dee09902..b544d4de521a 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -199,7 +199,7 @@ func createBenchmarkChangefeed( }, } initialHighWater := hlc.Timestamp{} - encoder, err := makeJSONEncoder(details.Opts) + encoder, err := makeJSONEncoder(details.Opts, details.Targets) if err != nil { return nil, nil, err } diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index 674aa3485fb2..4d6ca03a7663 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "cdctest", srcs = [ + "mock_webhook_sink.go", "nemeses.go", "schema_registry.go", "testfeed.go", diff --git a/pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go b/pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go new file mode 100644 index 000000000000..aa69c2c5a1ad --- /dev/null +++ b/pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go @@ -0,0 +1,145 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdctest + +import ( + "crypto/tls" + "io/ioutil" + "net/http" + "net/http/httptest" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// MockWebhookSink is the Webhook sink used in tests. +type MockWebhookSink struct { + basicAuth bool + username, password string + server *httptest.Server + mu struct { + statusCode int + rows []string + syncutil.Mutex + } +} + +// StartMockWebhookSink creates and starts a mock webhook sink for tests. +func StartMockWebhookSink(certificate *tls.Certificate) (*MockWebhookSink, error) { + s := makeMockWebhookSink() + if certificate != nil { + s.server.TLS = &tls.Config{ + Certificates: []tls.Certificate{*certificate}, + } + } + s.server.StartTLS() + return s, nil +} + +// StartMockWebhookSinkWithBasicAuth creates and starts a mock webhook sink for +// tests with basic username/password auth. +func StartMockWebhookSinkWithBasicAuth( + certificate *tls.Certificate, username, password string, +) (*MockWebhookSink, error) { + s := makeMockWebhookSink() + s.basicAuth = true + s.username = username + s.password = password + if certificate != nil { + s.server.TLS = &tls.Config{ + Certificates: []tls.Certificate{*certificate}, + } + } + s.server.StartTLS() + return s, nil +} + +func makeMockWebhookSink() *MockWebhookSink { + s := &MockWebhookSink{} + s.mu.statusCode = http.StatusOK + s.server = httptest.NewUnstartedServer(http.HandlerFunc(s.requestHandler)) + return s +} + +// URL returns the http address of this mock Webhook sink. +func (s *MockWebhookSink) URL() string { + return s.server.URL +} + +// SetStatusCode sets the HTTP status code to use when responding to a request. +// Useful for testing error handling behavior on client side. +func (s *MockWebhookSink) SetStatusCode(statusCode int) { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.statusCode = statusCode +} + +// Close closes the mock Webhook sink. +func (s *MockWebhookSink) Close() { + s.server.Close() + s.server.CloseClientConnections() +} + +// Latest returns the most recent message received by the MockWebhookSink. +func (s *MockWebhookSink) Latest() string { + s.mu.Lock() + defer s.mu.Unlock() + if len(s.mu.rows) == 0 { + return "" + } + latest := s.mu.rows[len(s.mu.rows)-1] + return latest +} + +// Pop deletes and returns the oldest message from MockWebhookSink +func (s *MockWebhookSink) Pop() string { + s.mu.Lock() + defer s.mu.Unlock() + if len(s.mu.rows) > 0 { + oldest := s.mu.rows[0] + s.mu.rows = s.mu.rows[1:] + return oldest + } + return "" +} + +func (s *MockWebhookSink) requestHandler(hw http.ResponseWriter, hr *http.Request) { + method := hr.Method + + var err error + switch { + case method == http.MethodPost: + if s.basicAuth { + username, password, ok := hr.BasicAuth() + if !ok || s.username != username || s.password != password { + hw.WriteHeader(http.StatusUnauthorized) + return + } + } + err = s.publish(hw, hr) + default: + hw.WriteHeader(http.StatusNotFound) + return + } + if err != nil { + http.Error(hw, err.Error(), http.StatusInternalServerError) + } +} + +func (s *MockWebhookSink) publish(hw http.ResponseWriter, hr *http.Request) error { + defer hr.Body.Close() + row, err := ioutil.ReadAll(hr.Body) + if err != nil { + return err + } + s.mu.Lock() + s.mu.rows = append(s.mu.rows, string(row)) + s.mu.Unlock() + hw.WriteHeader(s.mu.statusCode) + return nil +} diff --git a/pkg/ccl/changefeedccl/cdctest/testfeed.go b/pkg/ccl/changefeedccl/cdctest/testfeed.go index a67c9632ff22..f8f91e025989 100644 --- a/pkg/ccl/changefeedccl/cdctest/testfeed.go +++ b/pkg/ccl/changefeedccl/cdctest/testfeed.go @@ -48,7 +48,7 @@ type TestFeed interface { // by Next. Partitions() []string // Next returns the next message. Within a given topic+partition, the order is - // preserved, but not otherwisete. Either len(key) and len(value) will be + // preserved, but not otherwise. Either len(key) and len(value) will be // greater than zero (a row updated) or len(payload) will be (a resolved // timestamp). Next() (*TestFeedMessage, error) diff --git a/pkg/ccl/changefeedccl/cdctest/tls_util.go b/pkg/ccl/changefeedccl/cdctest/tls_util.go index 63fd30a70a60..a2769869462f 100644 --- a/pkg/ccl/changefeedccl/cdctest/tls_util.go +++ b/pkg/ccl/changefeedccl/cdctest/tls_util.go @@ -11,6 +11,7 @@ package cdctest import ( "crypto/rand" "crypto/rsa" + "crypto/tls" "crypto/x509" "crypto/x509/pkix" "encoding/base64" @@ -34,6 +35,42 @@ func EncodeBase64ToString(src []byte, dest *string) { } } +// NewCACertBase64Encoded generates a new CA cert and returns the +// cert object as well as a base 64 encoded PEM version. +func NewCACertBase64Encoded() (*tls.Certificate, string, error) { + keyLength := 2048 + + caKey, err := rsa.GenerateKey(rand.Reader, keyLength) + if err != nil { + return nil, "", errors.Wrap(err, "CA private key") + } + + caCert, _, err := GenerateCACert(caKey) + if err != nil { + return nil, "", errors.Wrap(err, "CA cert gen") + } + + caKeyPEM, err := PemEncodePrivateKey(caKey) + if err != nil { + return nil, "", errors.Wrap(err, "pem encode CA key") + } + + caCertPEM, err := PemEncodeCert(caCert) + if err != nil { + return nil, "", errors.Wrap(err, "pem encode CA cert") + } + + cert, err := tls.X509KeyPair([]byte(caCertPEM), []byte(caKeyPEM)) + if err != nil { + return nil, "", errors.Wrap(err, "CA cert parse from PEM") + } + + var caCertBase64 string + EncodeBase64ToString([]byte(caCertPEM), &caCertBase64) + + return &cert, caCertBase64, nil +} + // GenerateCACert generates a new self-signed CA cert using priv func GenerateCACert(priv *rsa.PrivateKey) ([]byte, *x509.Certificate, error) { serial, err := randomSerial() diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 4f8a0f2d4736..2b0b586d5bd4 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -242,14 +242,18 @@ func changefeedPlanHook( // and `format` if the user didn't specify them. // - Then `getEncoder` is run to return any configuration errors. // - Then the changefeed is opted in to `OptKeyInValue` for any cloud - // storage sink. Kafka etc have a key and value field in each message but - // cloud storage sinks don't have anywhere to put the key. So if the key - // is not in the value, then for DELETEs there is no way to recover which - // key was deleted. We could make the user explicitly pass this option for - // every cloud storage sink and error if they don't, but that seems - // user-hostile for insufficient reason. We can't do this any earlier, - // because we might return errors about `key_in_value` being incompatible - // which is confusing when the user didn't type that option. + // storage sink or webhook sink. Kafka etc have a key and value field in + // each message but cloud storage sinks and webhook sinks don't have + // anywhere to put the key. So if the key is not in the value, then for + // DELETEs there is no way to recover which key was deleted. We could make + // the user explicitly pass this option for every cloud storage sink/ + // webhook sink and error if they don't, but that seems user-hostile for + // insufficient reason. We can't do this any earlier, because we might + // return errors about `key_in_value` being incompatible which is + // confusing when the user didn't type that option. + // This is the same for the topic and webhook sink, which uses + // `topic_in_value` to embed the topic in the value by default, since it + // has no other avenue to express the topic. // - Finally, we create a "canary" sink to test sink configuration and // connectivity. This has to go last because it is strange to return sink // connectivity errors before we've finished validating all the other @@ -270,9 +274,12 @@ func changefeedPlanHook( if _, err := getEncoder(ctx, details.Opts, details.Targets); err != nil { return err } - if isCloudStorageSink(parsedSink) { + if isCloudStorageSink(parsedSink) || isWebhookSink(parsedSink) { details.Opts[changefeedbase.OptKeyInValue] = `` } + if isWebhookSink(parsedSink) { + details.Opts[changefeedbase.OptTopicInValue] = `` + } if !unspecifiedSink && p.ExecCfg().ExternalIODirConfig.DisableOutbound { return errors.Errorf("Outbound IO is disabled by configuration, cannot create changefeed into %s", parsedSink.Scheme) @@ -424,6 +431,9 @@ func changefeedJobDescription( SinkURI: tree.NewDString(cleanedSinkURI), } for k, v := range opts { + if k == changefeedbase.OptWebhookAuthHeader { + v = redactWebhookAuthHeader(v) + } opt := tree.KVOption{Key: tree.Name(k)} if len(v) > 0 { opt.Value = tree.NewDString(v) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 0beb74e805e4..a7d6007ca321 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -120,6 +120,7 @@ func TestChangefeedBasics(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) // NB running TestChangefeedBasics, which includes a DELETE, with // cloudStorageTest is a regression test for #36994. @@ -212,6 +213,7 @@ func TestChangefeedDiff(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedTenants(t *testing.T) { @@ -330,6 +332,7 @@ func TestChangefeedFullTableName(t *testing.T) { //t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedMultiTable(t *testing.T) { @@ -355,6 +358,7 @@ func TestChangefeedMultiTable(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedCursor(t *testing.T) { @@ -415,6 +419,7 @@ func TestChangefeedCursor(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedTimestamps(t *testing.T) { @@ -477,6 +482,7 @@ func TestChangefeedTimestamps(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedMVCCTimestamps(t *testing.T) { @@ -507,6 +513,7 @@ func TestChangefeedMVCCTimestamps(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedResolvedFrequency(t *testing.T) { @@ -539,6 +546,7 @@ func TestChangefeedResolvedFrequency(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } // Test how Changefeeds react to schema changes that do not require a backfill @@ -587,6 +595,7 @@ func TestChangefeedInitialScan(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedUserDefinedTypes(t *testing.T) { @@ -647,6 +656,7 @@ func TestChangefeedUserDefinedTypes(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedExternalIODisabled(t *testing.T) { @@ -879,6 +889,7 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) log.Flush() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData) @@ -1064,6 +1075,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) log.Flush() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData) @@ -1119,6 +1131,7 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) log.Flush() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData) @@ -1220,6 +1233,7 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) log.Flush() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData) @@ -1315,6 +1329,7 @@ func TestChangefeedColumnFamily(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedAuthorization(t *testing.T) { @@ -1448,6 +1463,7 @@ func TestChangefeedFailOnTableOffline(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedFailOnRBRChange(t *testing.T) { @@ -1489,6 +1505,7 @@ func TestChangefeedFailOnRBRChange(t *testing.T) { t.Run(`enterprise`, enterpriseTestWithServerArgs(withTestServerRegion, testFn)) t.Run(`cloudstorage`, cloudStorageTestWithServerArg(withTestServerRegion, testFn)) t.Run(`kafka`, kafkaTestWithServerArgs(withTestServerRegion, testFn)) + t.Run(`webhook`, webhookTestWithServerArgs(withTestServerRegion, testFn)) } func TestChangefeedStopOnSchemaChange(t *testing.T) { @@ -1636,6 +1653,7 @@ func TestChangefeedStopOnSchemaChange(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedNoBackfill(t *testing.T) { @@ -1753,6 +1771,7 @@ func TestChangefeedNoBackfill(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedComputedColumn(t *testing.T) { @@ -1783,6 +1802,7 @@ func TestChangefeedComputedColumn(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedUpdatePrimaryKey(t *testing.T) { @@ -1817,6 +1837,7 @@ func TestChangefeedUpdatePrimaryKey(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedTruncateOrDrop(t *testing.T) { @@ -1880,6 +1901,7 @@ func TestChangefeedTruncateOrDrop(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedMonitoring(t *testing.T) { @@ -2118,6 +2140,7 @@ func TestChangefeedRetryableError(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } // TestChangefeedDataTTL ensures that changefeeds fail with an error in the case @@ -2309,6 +2332,7 @@ func TestChangefeedSchemaTTL(t *testing.T) { t.Run("enterprise", enterpriseTest(testFn)) t.Run("cloudstorage", cloudStorageTest(testFn)) t.Run("kafka", kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedErrors(t *testing.T) { @@ -2570,12 +2594,17 @@ func TestChangefeedErrors(t *testing.T) { t, `client has run out of available brokers`, `CREATE CHANGEFEED FOR foo INTO 'kafka://nope/' WITH kafka_sink_config='{"Flush": {"Messages": 100, "Frequency": "1s"}}'`, ) - // The avro format doesn't support key_in_value yet. + // The avro format doesn't support key_in_value or topic_in_value yet. sqlDB.ExpectErr( t, `key_in_value is not supported with format=experimental_avro`, `CREATE CHANGEFEED FOR foo INTO $1 WITH key_in_value, format='experimental_avro'`, `kafka://nope`, ) + sqlDB.ExpectErr( + t, `topic_in_value is not supported with format=experimental_avro`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH topic_in_value, format='experimental_avro'`, + `kafka://nope`, + ) // The cloudStorageSink is particular about the options it will work with. sqlDB.ExpectErr( @@ -2599,6 +2628,16 @@ func TestChangefeedErrors(t *testing.T) { `CREATE CHANGEFEED FOR foo INTO $1 WITH key_in_value, envelope='row'`, `kafka://nope`, ) + // WITH topic_in_value requires envelope=wrapped + sqlDB.ExpectErr( + t, `topic_in_value is only usable with envelope=wrapped`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH topic_in_value, envelope='key_only'`, `kafka://nope`, + ) + sqlDB.ExpectErr( + t, `topic_in_value is only usable with envelope=wrapped`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH topic_in_value, envelope='row'`, `kafka://nope`, + ) + // WITH diff requires envelope=wrapped sqlDB.ExpectErr( t, `diff is only usable with envelope=wrapped`, @@ -2630,6 +2669,59 @@ func TestChangefeedErrors(t *testing.T) { `CREATE CHANGEFEED FOR foo INTO $1 WITH format='experimental_avro', confluent_schema_registry=$2`, `kafka://nope`, `https://schemareg-nope/?ca_cert=Zm9v`, ) + + // Sanity check http sink options. + sqlDB.ExpectErr( + t, `unsupported sink: https. HTTP endpoints can be used with webhook-https and experimental-https`, + `CREATE CHANGEFEED FOR foo INTO $1`, `https://fake-host`, + ) + sqlDB.ExpectErr( + t, `param insecure_tls_skip_verify must be a bool`, + `CREATE CHANGEFEED FOR foo INTO $1`, `webhook-https://fake-host?insecure_tls_skip_verify=foo`, + ) + sqlDB.ExpectErr( + t, `param ca_cert must be base 64 encoded`, + `CREATE CHANGEFEED FOR foo INTO $1`, `webhook-https://fake-host?ca_cert=?`, + ) + sqlDB.ExpectErr( + t, `failed to parse certificate data`, + `CREATE CHANGEFEED FOR foo INTO $1`, `webhook-https://fake-host?ca_cert=Zm9v`, + ) + sqlDB.ExpectErr( + t, `sink requires https`, + `CREATE CHANGEFEED FOR foo INTO $1`, `webhook-http://fake-host`, + ) + sqlDB.ExpectErr( + t, `this sink is incompatible with format=experimental_avro`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH format='experimental_avro', confluent_schema_registry=$2`, + `webhook-https://fake-host`, schemaReg.URL(), + ) + sqlDB.ExpectErr( + t, `problem parsing option webhook_client_timeout: time: invalid duration "not_an_integer"`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_client_timeout='not_an_integer'`, `webhook-https://fake-host`, + ) + sqlDB.ExpectErr( + t, `option webhook_client_timeout must be a positive duration`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_client_timeout='0s'`, `webhook-https://fake-host`, + ) + sqlDB.ExpectErr( + t, `option webhook_client_timeout must be a positive duration`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_client_timeout='-500s'`, `webhook-https://fake-host`, + ) + sqlDB.ExpectErr( + t, `problem parsing option webhook_client_timeout: time: missing unit in duration`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_client_timeout='0.5'`, `webhook-https://fake-host`, + ) + sqlDB.ExpectErr( + t, `this sink is incompatible with envelope=key_only`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH envelope='key_only'`, + `webhook-https://fake-host`, + ) + sqlDB.ExpectErr( + t, `this sink is incompatible with envelope=row`, + `CREATE CHANGEFEED FOR foo INTO $1 WITH envelope='row'`, + `webhook-https://fake-host`, + ) } func TestChangefeedDescription(t *testing.T) { @@ -2722,6 +2814,7 @@ func TestChangefeedPauseUnpause(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedPauseUnpauseCursorAndInitialScan(t *testing.T) { @@ -2766,6 +2859,7 @@ func TestChangefeedPauseUnpauseCursorAndInitialScan(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedProtectedTimestamps(t *testing.T) { @@ -3009,6 +3103,7 @@ func TestChangefeedProtectedTimestampOnPause(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn(shouldPause))) t.Run(`cloudstorage`, cloudStorageTest(testFn(shouldPause))) t.Run(`kafka`, kafkaTest(testFn(shouldPause))) + t.Run(`webhook`, webhookTest(testFn(shouldPause))) }) } @@ -3070,6 +3165,7 @@ func TestChangefeedProtectedTimestampsVerificationFails(t *testing.T) { t.Run(`enterprise`, enterpriseTestWithServerArgs(setStoreKnobs, testFn)) t.Run(`cloudstorage`, cloudStorageTestWithServerArg(setStoreKnobs, testFn)) t.Run(`kafka`, kafkaTestWithServerArgs(setStoreKnobs, testFn)) + t.Run(`webhook`, webhookTestWithServerArgs(setStoreKnobs, testFn)) } func TestManyChangefeedsOneTable(t *testing.T) { @@ -3125,6 +3221,7 @@ func TestManyChangefeedsOneTable(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestUnspecifiedPrimaryKey(t *testing.T) { @@ -3543,6 +3640,7 @@ INSERT INTO foo VALUES (1, 'f'); t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } // Primary key changes are supported by changefeeds starting in 21.1. This test @@ -3643,6 +3741,7 @@ INSERT INTO bar VALUES (6, 'f'); t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) t.Run(`kafka`, kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } // Primary key changes are supported by changefeeds starting in 21.1. @@ -3685,6 +3784,7 @@ func TestChangefeedPrimaryKeyChangeMixedVersion(t *testing.T) { t.Run("enterprise", enterpriseTestWithServerArgs(setMixedVersion, testFn)) t.Run("cloudstorage", cloudStorageTestWithServerArg(setMixedVersion, testFn)) t.Run("kafka", kafkaTestWithServerArgs(setMixedVersion, testFn)) + t.Run("webhook", webhookTestWithServerArgs(setMixedVersion, testFn)) } // TestChangefeedCheckpointSchemaChange tests to make sure that writes that @@ -3830,6 +3930,7 @@ func TestChangefeedCheckpointSchemaChange(t *testing.T) { t.Run("enterprise", enterpriseTest(testFn)) t.Run("cloudstorage", cloudStorageTest(testFn)) t.Run("kafka", kafkaTest(testFn)) + t.Run(`webhook`, webhookTest(testFn)) } func TestChangefeedBackfillCheckpoint(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 238d0503ec32..a8a2ee942eb3 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -33,6 +33,7 @@ const ( OptFormat = `format` OptFullTableName = `full_table_name` OptKeyInValue = `key_in_value` + OptTopicInValue = `topic_in_value` OptResolvedTimestamps = `resolved` OptUpdatedTimestamps = `updated` OptMVCCTimestamps = `mvcc_timestamp` @@ -41,6 +42,8 @@ const ( OptSchemaChangeEvents = `schema_change_events` OptSchemaChangePolicy = `schema_change_policy` OptProtectDataFromGCOnPause = `protect_data_from_gc_on_pause` + OptWebhookAuthHeader = `webhook_auth_header` + OptWebhookClientTimeout = `webhook_client_timeout` // OptSchemaChangeEventClassColumnChange corresponds to all schema change // events which add or remove any column. @@ -88,24 +91,34 @@ const ( // OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig). OptKafkaSinkConfig = `kafka_sink_config` - SinkParamCACert = `ca_cert` - SinkParamClientCert = `client_cert` - SinkParamClientKey = `client_key` - SinkParamFileSize = `file_size` - SinkParamSchemaTopic = `schema_topic` - SinkParamTLSEnabled = `tls_enabled` - SinkParamSkipTLSVerify = `insecure_tls_skip_verify` - SinkParamTopicPrefix = `topic_prefix` - SinkParamTopicName = `topic_name` - SinkSchemeBuffer = `` - SinkSchemeExperimentalSQL = `experimental-sql` - SinkSchemeKafka = `kafka` - SinkSchemeNull = `null` - SinkParamSASLEnabled = `sasl_enabled` - SinkParamSASLHandshake = `sasl_handshake` - SinkParamSASLUser = `sasl_user` - SinkParamSASLPassword = `sasl_password` - SinkParamSASLMechanism = `sasl_mechanism` + SinkParamCACert = `ca_cert` + SinkParamClientCert = `client_cert` + SinkParamClientKey = `client_key` + SinkParamFileSize = `file_size` + SinkParamSchemaTopic = `schema_topic` + SinkParamTLSEnabled = `tls_enabled` + SinkParamSkipTLSVerify = `insecure_tls_skip_verify` + SinkParamTopicPrefix = `topic_prefix` + SinkParamTopicName = `topic_name` + SinkSchemeBuffer = `` + SinkSchemeCloudStorageAzure = `experimental-azure` + SinkSchemeCloudStorageGCS = `experimental-gs` + SinkSchemeCloudStorageHTTP = `experimental-http` + SinkSchemeCloudStorageHTTPS = `experimental-https` + SinkSchemeCloudStorageNodelocal = `experimental-nodelocal` + SinkSchemeCloudStorageS3 = `experimental-s3` + SinkSchemeExperimentalSQL = `experimental-sql` + SinkSchemeHTTP = `http` + SinkSchemeHTTPS = `https` + SinkSchemeKafka = `kafka` + SinkSchemeNull = `null` + SinkSchemeWebhookHTTP = `webhook-http` + SinkSchemeWebhookHTTPS = `webhook-https` + SinkParamSASLEnabled = `sasl_enabled` + SinkParamSASLHandshake = `sasl_handshake` + SinkParamSASLUser = `sasl_user` + SinkParamSASLPassword = `sasl_password` + SinkParamSASLMechanism = `sasl_mechanism` RegistryParamCACert = `ca_cert` ) @@ -120,6 +133,7 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{ OptFormat: sql.KVStringOptRequireValue, OptFullTableName: sql.KVStringOptRequireNoValue, OptKeyInValue: sql.KVStringOptRequireNoValue, + OptTopicInValue: sql.KVStringOptRequireNoValue, OptResolvedTimestamps: sql.KVStringOptAny, OptUpdatedTimestamps: sql.KVStringOptRequireNoValue, OptMVCCTimestamps: sql.KVStringOptRequireNoValue, @@ -131,4 +145,6 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{ OptNoInitialScan: sql.KVStringOptRequireNoValue, OptProtectDataFromGCOnPause: sql.KVStringOptRequireNoValue, OptKafkaSinkConfig: sql.KVStringOptRequireValue, + OptWebhookAuthHeader: sql.KVStringOptRequireValue, + OptWebhookClientTimeout: sql.KVStringOptRequireValue, } diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 3f7f7c821bf4..e5b5db5ba1ca 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -13,6 +13,7 @@ import ( "context" "encoding/binary" gojson "encoding/json" + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" @@ -84,7 +85,7 @@ func getEncoder( ) (Encoder, error) { switch changefeedbase.FormatType(opts[changefeedbase.OptFormat]) { case ``, changefeedbase.OptFormatJSON: - return makeJSONEncoder(opts) + return makeJSONEncoder(opts, targets) case changefeedbase.OptFormatAvro: return newConfluentAvroEncoder(ctx, opts, targets) case changefeedbase.OptFormatNative: @@ -99,16 +100,20 @@ func getEncoder( // to its value. Updated timestamps in rows and resolved timestamp payloads are // stored in a sub-object under the `__crdb__` key in the top-level JSON object. type jsonEncoder struct { - updatedField, mvccTimestampField, beforeField, wrapped, keyOnly, keyInValue bool + updatedField, mvccTimestampField, beforeField, wrapped, keyOnly, keyInValue, topicInValue bool - alloc rowenc.DatumAlloc - buf bytes.Buffer + targets jobspb.ChangefeedTargets + alloc rowenc.DatumAlloc + buf bytes.Buffer } var _ Encoder = &jsonEncoder{} -func makeJSONEncoder(opts map[string]string) (*jsonEncoder, error) { +func makeJSONEncoder( + opts map[string]string, targets jobspb.ChangefeedTargets, +) (*jsonEncoder, error) { e := &jsonEncoder{ + targets: targets, keyOnly: changefeedbase.EnvelopeType(opts[changefeedbase.OptEnvelope]) == changefeedbase.OptEnvelopeKeyOnly, wrapped: changefeedbase.EnvelopeType(opts[changefeedbase.OptEnvelope]) == changefeedbase.OptEnvelopeWrapped, } @@ -124,6 +129,11 @@ func makeJSONEncoder(opts map[string]string) (*jsonEncoder, error) { return nil, errors.Errorf(`%s is only usable with %s=%s`, changefeedbase.OptKeyInValue, changefeedbase.OptEnvelope, changefeedbase.OptEnvelopeWrapped) } + _, e.topicInValue = opts[changefeedbase.OptTopicInValue] + if e.topicInValue && !e.wrapped { + return nil, errors.Errorf(`%s is only usable with %s=%s`, + changefeedbase.OptTopicInValue, changefeedbase.OptEnvelope, changefeedbase.OptEnvelopeWrapped) + } return e, nil } @@ -166,6 +176,17 @@ func (e *jsonEncoder) encodeKeyRaw(row encodeRow) ([]interface{}, error) { return jsonEntries, nil } +func (e *jsonEncoder) encodeTopicRaw(row encodeRow) (interface{}, error) { + descID := row.tableDesc.GetID() + // use the target list since row.tableDesc.GetName() will not have fully qualified names + topicName, ok := e.targets[descID] + if !ok { + return nil, fmt.Errorf("table with name %s and descriptor ID %d not found in changefeed target list", + row.tableDesc.GetName(), descID) + } + return topicName.StatementTimeName, nil +} + // EncodeValue implements the Encoder interface. func (e *jsonEncoder) EncodeValue(_ context.Context, row encodeRow) ([]byte, error) { if e.keyOnly || (!e.wrapped && row.deleted) { @@ -227,6 +248,13 @@ func (e *jsonEncoder) EncodeValue(_ context.Context, row encodeRow) ([]byte, err } jsonEntries[`key`] = keyEntries } + if e.topicInValue { + topicEntry, err := e.encodeTopicRaw(row) + if err != nil { + return nil, err + } + jsonEntries[`topic`] = topicEntry + } } else { jsonEntries = after } @@ -338,6 +366,10 @@ func newConfluentAvroEncoder( return nil, errors.Errorf(`%s is not supported with %s=%s`, changefeedbase.OptKeyInValue, changefeedbase.OptFormat, changefeedbase.OptFormatAvro) } + if _, ok := opts[changefeedbase.OptTopicInValue]; ok { + return nil, errors.Errorf(`%s is not supported with %s=%s`, + changefeedbase.OptTopicInValue, changefeedbase.OptFormat, changefeedbase.OptFormatAvro) + } if len(opts[changefeedbase.OptConfluentSchemaRegistry]) == 0 { return nil, errors.Errorf(`WITH option %s is required for %s=%s`, changefeedbase.OptConfluentSchemaRegistry, changefeedbase.OptFormat, changefeedbase.OptFormatAvro) diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index 09be14e0f8bc..a252bf0f7a19 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -10,9 +10,6 @@ package changefeedccl import ( "context" - "crypto/rand" - "crypto/rsa" - "crypto/tls" gosql "database/sql" "fmt" "net/url" @@ -31,7 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/workload/ledger" "github.com/cockroachdb/cockroach/pkg/workload/workloadsql" - "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -318,40 +314,6 @@ func TestAvroEncoder(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn)) } -func newCACertBase64Encoded() (*tls.Certificate, string, error) { - keyLength := 2048 - - CAKey, err := rsa.GenerateKey(rand.Reader, keyLength) - if err != nil { - return nil, "", errors.Wrap(err, "CA private key") - } - - CACert, _, err := cdctest.GenerateCACert(CAKey) - if err != nil { - return nil, "", errors.Wrap(err, "CA cert gen") - } - - CAKeyPEM, err := cdctest.PemEncodePrivateKey(CAKey) - if err != nil { - return nil, "", errors.Wrap(err, "pem encode CA key") - } - - CACertPEM, err := cdctest.PemEncodeCert(CACert) - if err != nil { - return nil, "", errors.Wrap(err, "pem encode CA cert") - } - - cert, err := tls.X509KeyPair([]byte(CACertPEM), []byte(CAKeyPEM)) - if err != nil { - return nil, "", errors.Wrap(err, "CA cert parse from PEM") - } - - var CACertBase64 string - cdctest.EncodeBase64ToString([]byte(CACertPEM), &CACertBase64) - - return &cert, CACertBase64, nil -} - func TestAvroEncoderWithTLS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -379,7 +341,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { } t.Run("format=experimental_avro,envelope=key_only", func(t *testing.T) { - cert, certBase64, err := newCACertBase64Encoded() + cert, certBase64, err := cdctest.NewCACertBase64Encoded() require.NoError(t, err) var rowStringFn func([]byte, []byte) string @@ -455,7 +417,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { "Get \"%s/mode\": x509: certificate signed by unknown authority", opts[changefeedbase.OptConfluentSchemaRegistry])) - wrongCert, _, err := newCACertBase64Encoded() + wrongCert, _, err := cdctest.NewCACertBase64Encoded() require.NoError(t, err) wrongCertReg, err := cdctest.StartTestSchemaRegistryWithTLS(wrongCert) diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 24f726b696e0..bbb77a60d2fc 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -479,6 +479,21 @@ func kafkaTestWithServerArgs( } } +func webhookTest(testFn cdcTestFn) func(t *testing.T) { + return webhookTestWithServerArgs(nil, testFn) +} + +func webhookTestWithServerArgs( + argsFn func(args *base.TestServerArgs), testFn cdcTestFn, +) func(*testing.T) { + return func(t *testing.T) { + s, db, stopServer := startTestServer(t, argsFn) + defer stopServer() + f := makeWebhookFeedFactory(s, db) + testFn(t, db, f) + } +} + func feed( t testing.TB, f cdctest.TestFeedFactory, create string, args ...interface{}, ) cdctest.TestFeed { diff --git a/pkg/ccl/changefeedccl/nemeses_test.go b/pkg/ccl/changefeedccl/nemeses_test.go index 724878bd55aa..53dcaf818c6a 100644 --- a/pkg/ccl/changefeedccl/nemeses_test.go +++ b/pkg/ccl/changefeedccl/nemeses_test.go @@ -47,6 +47,7 @@ func TestChangefeedNemeses(t *testing.T) { t.Run(`sinkless`, sinklessTest(testFn, feedTestNoTenants)) t.Run(`enterprise`, enterpriseTest(testFn)) t.Run(`cloudstorage`, cloudStorageTest(testFn)) + t.Run(`http`, webhookTest(testFn)) log.Flush() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData) diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index 75480442a3ac..89e2f716c38a 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -74,8 +74,9 @@ func TestShowChangefeedJobs(t *testing.T) { type row struct { id jobspb.JobID SinkURI string - FulLTableNames []uint8 + FullTableNames []uint8 format string + description string DescriptorIDs []descpb.ID } @@ -108,24 +109,28 @@ func TestShowChangefeedJobs(t *testing.T) { query = `SET CLUSTER SETTING kv.rangefeed.enabled = true` sqlDB.Exec(t, query) - var changefeedID jobspb.JobID + var singleChangefeedID, multiChangefeedID jobspb.JobID + + query = `CREATE CHANGEFEED FOR TABLE foo INTO + 'webhook-https://fake-http-sink:8081' WITH webhook_auth_header='Basic Zm9v'` + sqlDB.QueryRow(t, query).Scan(&singleChangefeedID) // Cannot use kafka for tests right now because of leaked goroutine issue - query = `CREATE CHANGEFEED FOR TABLE foo, bar INTO - 'experimental-http://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456'` - sqlDB.QueryRow(t, query).Scan(&changefeedID) + query = `CREATE CHANGEFEED FOR TABLE foo, bar INTO + 'experimental-s3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456'` + sqlDB.QueryRow(t, query).Scan(&multiChangefeedID) var out row query = `SELECT job_id, sink_uri, full_table_names, format FROM [SHOW CHANGEFEED JOB $1]` - sqlDB.QueryRow(t, query, changefeedID).Scan(&out.id, &out.SinkURI, &out.FulLTableNames, &out.format) + sqlDB.QueryRow(t, query, multiChangefeedID).Scan(&out.id, &out.SinkURI, &out.FullTableNames, &out.format) - require.Equal(t, changefeedID, out.id, "Expected id:%d but found id:%d", changefeedID, out.id) - require.Equal(t, "experimental-http://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=redacted", out.SinkURI, "Expected sinkUri:%s but found sinkUri:%s", "experimental-http://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=redacted", out.SinkURI) - require.Equal(t, "{defaultdb.public.foo,defaultdb.public.bar}", string(out.FulLTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{defaultdb.public.foo,defaultdb.public.bar}", string(out.FulLTableNames)) + require.Equal(t, multiChangefeedID, out.id, "Expected id:%d but found id:%d", multiChangefeedID, out.id) + require.Equal(t, "experimental-s3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=redacted", out.SinkURI, "Expected sinkUri:%s but found sinkUri:%s", "experimental-s3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=redacted", out.SinkURI) + require.Equal(t, "{defaultdb.public.foo,defaultdb.public.bar}", string(out.FullTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{defaultdb.public.foo,defaultdb.public.bar}", string(out.FullTableNames)) require.Equal(t, "json", out.format, "Expected format:%s but found format:%s", "json", out.format) - query = `SELECT job_id, sink_uri, full_table_names, format FROM [SHOW CHANGEFEED JOBS] ORDER BY job_id` + query = `SELECT job_id, description, sink_uri, full_table_names, format FROM [SHOW CHANGEFEED JOBS] ORDER BY sink_uri` rowResults := sqlDB.Query(t, query) if !rowResults.Next() { @@ -136,14 +141,33 @@ func TestShowChangefeedJobs(t *testing.T) { t.Fatalf("Expected more rows when querying and none found for query: %s", query) } } - err := rowResults.Scan(&out.id, &out.SinkURI, &out.FulLTableNames, &out.format) + err := rowResults.Scan(&out.id, &out.description, &out.SinkURI, &out.FullTableNames, &out.format) + if err != nil { + t.Fatal(err) + } + + require.Equal(t, multiChangefeedID, out.id, "Expected id:%d but found id:%d", multiChangefeedID, out.id) + require.Equal(t, "experimental-s3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=redacted", out.SinkURI, "Expected sinkUri:%s but found sinkUri:%s", "experimental-s3://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=redacted", out.SinkURI) + require.Equal(t, "{defaultdb.public.foo,defaultdb.public.bar}", string(out.FullTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{defaultdb.public.foo,defaultdb.public.bar}", string(out.FullTableNames)) + require.Equal(t, "json", out.format, "Expected format:%s but found format:%s", "json", out.format) + + if !rowResults.Next() { + err := rowResults.Err() + if err != nil { + t.Fatalf("Error encountered while querying the next row: %v", err) + } else { + t.Fatalf("Expected more rows when querying and none found for query: %s", query) + } + } + err = rowResults.Scan(&out.id, &out.description, &out.SinkURI, &out.FullTableNames, &out.format) if err != nil { t.Fatal(err) } - require.Equal(t, changefeedID, out.id, "Expected id:%d but found id:%d", changefeedID, out.id) - require.Equal(t, "experimental-http://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=redacted", out.SinkURI, "Expected sinkUri:%s but found sinkUri:%s", "experimental-http://fake-bucket-name/fake/path?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=redacted", out.SinkURI) - require.Equal(t, "{defaultdb.public.foo,defaultdb.public.bar}", string(out.FulLTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{defaultdb.public.foo,defaultdb.public.bar}", string(out.FulLTableNames)) + require.Equal(t, singleChangefeedID, out.id, "Expected id:%d but found id:%d", singleChangefeedID, out.id) + require.Equal(t, "CREATE CHANGEFEED FOR TABLE foo INTO 'webhook-https://fake-http-sink:8081' WITH webhook_auth_header = 'redacted'", out.description, "Expected description:%s but found description:%s", "CREATE CHANGEFEED FOR TABLE foo INTO 'webhook-https://fake-http-sink:8081' WITH webhook_auth_header = 'redacted'", out.description) + require.Equal(t, "webhook-https://fake-http-sink:8081", out.SinkURI, "Expected sinkUri:%s but found sinkUri:%s", "webhook-https://fake-http-sink:8081", out.SinkURI) + require.Equal(t, "{defaultdb.public.foo}", string(out.FullTableNames), "Expected fullTableNames:%s but found fullTableNames:%s", "{defaultdb.public.foo}", string(out.FullTableNames)) require.Equal(t, "json", out.format, "Expected format:%s but found format:%s", "json", out.format) hasNext := rowResults.Next() diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index c993ce6ceba9..2514e9288658 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -81,6 +81,8 @@ func getSink( return makeNullSink(sinkURL{URL: u}) case u.Scheme == changefeedbase.SinkSchemeKafka: return makeKafkaSink(ctx, sinkURL{URL: u}, feedCfg.Targets, feedCfg.Opts, acc) + case isWebhookSink(u): + return makeWebhookSink(ctx, sinkURL{URL: u}, feedCfg.Opts) case isCloudStorageSink(u): return makeCloudStorageSink( ctx, sinkURL{URL: u}, serverCfg.NodeID.SQLInstanceID(), serverCfg.Settings, @@ -88,6 +90,9 @@ func getSink( ) case u.Scheme == changefeedbase.SinkSchemeExperimentalSQL: return makeSQLSink(sinkURL{URL: u}, sqlSinkTableName, feedCfg.Targets) + case u.Scheme == changefeedbase.SinkSchemeHTTP || u.Scheme == changefeedbase.SinkSchemeHTTPS: + return nil, errors.Errorf(`unsupported sink: %s. HTTP endpoints can be used with %s and %s`, + u.Scheme, changefeedbase.SinkSchemeWebhookHTTPS, changefeedbase.SinkSchemeCloudStorageHTTPS) default: return nil, errors.Errorf(`unsupported sink: %s`, u.Scheme) } @@ -125,6 +130,31 @@ func (u *sinkURL) consumeParam(p string) string { return v } +func (u *sinkURL) consumeBool(param string, dest *bool) (wasSet bool, err error) { + if paramVal := u.consumeParam(param); paramVal != "" { + wasSet, err := strToBool(paramVal, dest) + if err != nil { + return false, errors.Wrapf(err, "param %s must be a bool", param) + } + return wasSet, err + } + return false, nil +} + +func (u *sinkURL) decodeBase64(param string, dest *[]byte) error { + // TODO(dan): There's a straightforward and unambiguous transformation + // between the base 64 encoding defined in RFC 4648 and the URL variant + // defined in the same RFC: simply replace all `+` with `-` and `/` with + // `_`. Consider always doing this for the user and accepting either + // variant. + val := u.consumeParam(param) + err := decodeBase64FromString(val, dest) + if err != nil { + return errors.Wrapf(err, `param %s must be base 64 encoded`, param) + } + return nil +} + func (u *sinkURL) remainingQueryParams() (res []string) { for p := range u.q { res = append(res, p) @@ -299,7 +329,7 @@ func (n *nullSink) pace(ctx context.Context) error { return nil } -// EmitRow implements SInk interface. +// EmitRow implements Sink interface. func (n *nullSink) EmitRow( ctx context.Context, topic TopicDescriptor, key, value []byte, updated hlc.Timestamp, ) error { @@ -312,7 +342,7 @@ func (n *nullSink) EmitRow( return nil } -// EmitResolvedTimestamp implements SInk interface. +// EmitResolvedTimestamp implements Sink interface. func (n *nullSink) EmitResolvedTimestamp( ctx context.Context, encoder Encoder, resolved hlc.Timestamp, ) error { @@ -326,7 +356,7 @@ func (n *nullSink) EmitResolvedTimestamp( return nil } -// Flush implements SInk interface. +// Flush implements Sink interface. func (n *nullSink) Flush(ctx context.Context) error { if log.V(2) { log.Info(ctx, "flushing") @@ -335,12 +365,12 @@ func (n *nullSink) Flush(ctx context.Context) error { return nil } -// Close implements SInk interface. +// Close implements Sink interface. func (n *nullSink) Close() error { return nil } -// Dial implements SInk interface. +// Dial implements Sink interface. func (n *nullSink) Dial() error { return nil } diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 093936fa7375..54d741c01093 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -36,8 +36,9 @@ import ( func isCloudStorageSink(u *url.URL) bool { switch u.Scheme { - case `experimental-s3`, `experimental-gs`, `experimental-nodelocal`, `experimental-http`, - `experimental-https`, `experimental-azure`: + case changefeedbase.SinkSchemeCloudStorageS3, changefeedbase.SinkSchemeCloudStorageGCS, + changefeedbase.SinkSchemeCloudStorageNodelocal, changefeedbase.SinkSchemeCloudStorageHTTP, + changefeedbase.SinkSchemeCloudStorageHTTPS, changefeedbase.SinkSchemeCloudStorageAzure: return true default: return false diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index c87257e401c6..d250bfc4a83c 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/blobs" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -108,7 +109,7 @@ func TestCloudStorageSink(t *testing.T) { changefeedbase.OptCompression: ``, // NB: overridden in single-node subtest. } ts := func(i int64) hlc.Timestamp { return hlc.Timestamp{WallTime: i} } - e, err := makeJSONEncoder(opts) + e, err := makeJSONEncoder(opts, jobspb.ChangefeedTargets{}) require.NoError(t, err) clientFactory := blobs.TestBlobServiceClient(settings.ExternalIODir) diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 4f12f19933cd..57438555a1f8 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -466,52 +466,27 @@ func buildKafkaConfig(u sinkURL, opts map[string]string) (*sarama.Config, error) saslMechanism string }{} - consumeBool := func(param string, dest *bool) (wasSet bool, err error) { - if paramVal := u.consumeParam(param); paramVal != "" { - wasSet, err := strToBool(paramVal, dest) - if err != nil { - return false, errors.Wrapf(err, "param %s must be a bool", param) - } - return wasSet, err - } - return false, nil - } - - decodeBase64 := func(param string, dest *[]byte) error { - // TODO(dan): There's a straightforward and unambiguous transformation - // between the base 64 encoding defined in RFC 4648 and the URL variant - // defined in the same RFC: simply replace all `+` with `-` and `/` with - // `_`. Consider always doing this for the user and accepting either - // variant. - val := u.consumeParam(param) - err := decodeBase64FromString(val, dest) - if err != nil { - return errors.Wrapf(err, `param %s must be base 64 encoded`, param) - } - return nil - } - - if _, err := consumeBool(changefeedbase.SinkParamTLSEnabled, &dialConfig.tlsEnabled); err != nil { + if _, err := u.consumeBool(changefeedbase.SinkParamTLSEnabled, &dialConfig.tlsEnabled); err != nil { return nil, err } - if _, err := consumeBool(changefeedbase.SinkParamSkipTLSVerify, &dialConfig.tlsSkipVerify); err != nil { + if _, err := u.consumeBool(changefeedbase.SinkParamSkipTLSVerify, &dialConfig.tlsSkipVerify); err != nil { return nil, err } - if err := decodeBase64(changefeedbase.SinkParamCACert, &dialConfig.caCert); err != nil { + if err := u.decodeBase64(changefeedbase.SinkParamCACert, &dialConfig.caCert); err != nil { return nil, err } - if err := decodeBase64(changefeedbase.SinkParamClientCert, &dialConfig.clientCert); err != nil { + if err := u.decodeBase64(changefeedbase.SinkParamClientCert, &dialConfig.clientCert); err != nil { return nil, err } - if err := decodeBase64(changefeedbase.SinkParamClientKey, &dialConfig.clientKey); err != nil { + if err := u.decodeBase64(changefeedbase.SinkParamClientKey, &dialConfig.clientKey); err != nil { return nil, err } - if _, err := consumeBool(changefeedbase.SinkParamSASLEnabled, &dialConfig.saslEnabled); err != nil { + if _, err := u.consumeBool(changefeedbase.SinkParamSASLEnabled, &dialConfig.saslEnabled); err != nil { return nil, err } - if wasSet, err := consumeBool(changefeedbase.SinkParamSASLHandshake, &dialConfig.saslHandshake); !wasSet && err == nil { + if wasSet, err := u.consumeBool(changefeedbase.SinkParamSASLHandshake, &dialConfig.saslHandshake); !wasSet && err == nil { dialConfig.saslHandshake = true } else { if err != nil { diff --git a/pkg/ccl/changefeedccl/sink_webhook.go b/pkg/ccl/changefeedccl/sink_webhook.go new file mode 100644 index 000000000000..9b0391e115cd --- /dev/null +++ b/pkg/ccl/changefeedccl/sink_webhook.go @@ -0,0 +1,262 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/errors" +) + +const ( + applicationTypeJSON = `application/json` + authorizationHeader = `Authorization` + defaultConnTimeout = 3 * time.Second +) + +func isWebhookSink(u *url.URL) bool { + switch u.Scheme { + // allow HTTP here but throw an error later to make it clear HTTPS is required + case changefeedbase.SinkSchemeWebhookHTTP, changefeedbase.SinkSchemeWebhookHTTPS: + return true + default: + return false + } +} + +type webhookSinkPayload struct { + Payload []json.RawMessage `json:"payload"` +} + +func encodePayloadWebhook(value []byte) ([]byte, error) { + payload := json.RawMessage(value) + // the 'payload' field has an array as a value to support + // batched rows in the future + body := &webhookSinkPayload{ + Payload: []json.RawMessage{payload}, + } + j, err := json.Marshal(body) + if err != nil { + return nil, err + } + return j, err +} + +type webhookSink struct { + ctx context.Context + url sinkURL + authHeader string + client *httputil.Client + cancelFunc func() +} + +func makeWebhookSink(ctx context.Context, u sinkURL, opts map[string]string) (Sink, error) { + if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS { + return nil, errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS) + } + u.Scheme = strings.TrimPrefix(u.Scheme, `webhook-`) + + switch changefeedbase.FormatType(opts[changefeedbase.OptFormat]) { + case changefeedbase.OptFormatJSON: + // only JSON supported at this time for webhook sink + default: + return nil, errors.Errorf(`this sink is incompatible with %s=%s`, + changefeedbase.OptFormat, opts[changefeedbase.OptFormat]) + } + + switch changefeedbase.EnvelopeType(opts[changefeedbase.OptEnvelope]) { + case changefeedbase.OptEnvelopeWrapped: + default: + return nil, errors.Errorf(`this sink is incompatible with %s=%s`, + changefeedbase.OptEnvelope, opts[changefeedbase.OptEnvelope]) + } + + if _, ok := opts[changefeedbase.OptKeyInValue]; !ok { + return nil, errors.Errorf(`this sink requires the WITH %s option`, changefeedbase.OptKeyInValue) + } + + if _, ok := opts[changefeedbase.OptTopicInValue]; !ok { + return nil, errors.Errorf(`this sink requires the WITH %s option`, changefeedbase.OptTopicInValue) + } + + var connTimeout time.Duration + if timeout, ok := opts[changefeedbase.OptWebhookClientTimeout]; ok { + var err error + connTimeout, err = time.ParseDuration(timeout) + if err != nil { + return nil, errors.Wrapf(err, "problem parsing option %s", changefeedbase.OptWebhookClientTimeout) + } else if connTimeout <= time.Duration(0) { + return nil, fmt.Errorf("option %s must be a positive duration", changefeedbase.OptWebhookClientTimeout) + } + } else { + connTimeout = defaultConnTimeout + } + + ctx, cancel := context.WithCancel(ctx) + + sink := &webhookSink{ + ctx: ctx, + cancelFunc: cancel, + authHeader: opts[changefeedbase.OptWebhookAuthHeader], + } + + var err error + sink.client, err = makeWebhookClient(u, connTimeout) + if err != nil { + return nil, err + } + + // remove known query params from sink URL before setting in sink config + sinkURLParsed, err := url.Parse(u.String()) + if err != nil { + return nil, err + } + params := sinkURLParsed.Query() + params.Del(changefeedbase.SinkParamSkipTLSVerify) + params.Del(changefeedbase.SinkParamCACert) + sinkURLParsed.RawQuery = params.Encode() + sink.url = sinkURL{URL: sinkURLParsed} + + return sink, nil +} + +func makeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, error) { + client := &httputil.Client{ + Client: &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + DialContext: (&net.Dialer{Timeout: timeout}).DialContext, + }, + }, + } + + dialConfig := struct { + tlsSkipVerify bool + caCert []byte + clientCert []byte + clientKey []byte + }{} + + transport := client.Transport.(*http.Transport) + + if _, err := u.consumeBool(changefeedbase.SinkParamSkipTLSVerify, &dialConfig.tlsSkipVerify); err != nil { + return nil, err + } + if err := u.decodeBase64(changefeedbase.SinkParamCACert, &dialConfig.caCert); err != nil { + return nil, err + } + + transport.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: dialConfig.tlsSkipVerify, + } + + if dialConfig.caCert != nil { + caCertPool, err := x509.SystemCertPool() + if err != nil { + return nil, errors.Wrap(err, "could not load system root CA pool") + } + if caCertPool == nil { + caCertPool = x509.NewCertPool() + } + if !caCertPool.AppendCertsFromPEM(dialConfig.caCert) { + return nil, errors.Errorf("failed to parse certificate data:%s", string(dialConfig.caCert)) + } + transport.TLSClientConfig.RootCAs = caCertPool + } + + return client, nil +} + +// Dial is a no-op for this sink since we don't necessarily have +// a "health check" endpoint to use. +func (s *webhookSink) Dial() error { + return nil +} + +func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), bytes.NewReader(reqBody)) + if err != nil { + return err + } + req.Header.Set("Content-Type", applicationTypeJSON) + if s.authHeader != "" { + req.Header.Set(authorizationHeader, s.authHeader) + } + + var res *http.Response + res, err = s.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if !(res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusMultipleChoices) { + resBody, err := ioutil.ReadAll(res.Body) + if err != nil { + return errors.Wrapf(err, "failed to read body for HTTP response with status: %d", res.StatusCode) + } + return fmt.Errorf("%s: %s", res.Status, string(resBody)) + } + return nil +} + +func (s *webhookSink) EmitRow( + ctx context.Context, _ TopicDescriptor, _, value []byte, _ hlc.Timestamp, +) error { + j, err := encodePayloadWebhook(value) + if err != nil { + return err + } + + return s.sendMessage(ctx, j) +} + +func (s *webhookSink) EmitResolvedTimestamp( + ctx context.Context, encoder Encoder, resolved hlc.Timestamp, +) error { + j, err := encoder.EncodeResolvedTimestamp(ctx, "", resolved) + if err != nil { + return err + } + + return s.sendMessage(ctx, j) +} + +// Flush() is a no-op for now since calls to EmitRow() are synchronous +func (s *webhookSink) Flush(ctx context.Context) error { + return nil +} + +func (s *webhookSink) Close() error { + s.cancelFunc() + s.client.CloseIdleConnections() + return nil +} + +// redactWebhookAuthHeader redacts sensitive information from `auth`, which +// should be the value of the HTTP header `Authorization:`. The entire header +// should be redacted here. Wrapped in a function so we can change the +// redaction strategy if needed. +func redactWebhookAuthHeader(_ string) string { + return "redacted" +} diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go new file mode 100644 index 000000000000..11b8127cfef4 --- /dev/null +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -0,0 +1,273 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "fmt" + "net/http" + "net/url" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/stretchr/testify/require" +) + +func getGenericWebhookSinkOptions() map[string]string { + opts := make(map[string]string) + opts[changefeedbase.OptFormat] = string(changefeedbase.OptFormatJSON) + opts[changefeedbase.OptKeyInValue] = `` + opts[changefeedbase.OptEnvelope] = string(changefeedbase.OptEnvelopeWrapped) + opts[changefeedbase.OptTopicInValue] = `` + return opts +} + +func setupWebhookSinkWithDetails(details jobspb.ChangefeedDetails) (Sink, error) { + serverCfg := &execinfra.ServerConfig{Settings: cluster.MakeTestingClusterSettings()} + sinkSrc, err := getSink(context.Background(), serverCfg, details, nil, + security.SQLUsername{}, mon.BoundAccount{}, 0) + if err != nil { + return nil, err + } + + return sinkSrc, nil +} + +// general happy path for webhook sink +func testSendAndReceiveRows(t *testing.T, sinkSrc Sink, sinkDest *cdctest.MockWebhookSink) { + ctx := context.Background() + + // test an insert row entry + err := sinkSrc.EmitRow(ctx, nil, []byte("[1001]"), + []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), hlc.Timestamp{}) + if err != nil { + t.Fatal(err) + } + err = sinkSrc.Flush(ctx) + if err != nil { + t.Fatal(err) + } + require.Equal(t, + "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}]}", sinkDest.Latest(), + "sink %s expected to receive message %s", sinkDest.URL(), + "{\"payload\":[{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}]}") + + // test a delete row entry + err = sinkSrc.EmitRow(ctx, nil, []byte("[1002]"), []byte("{\"after\":null,\"key\":[1002],\"topic:\":\"foo\"}"), hlc.Timestamp{}) + if err != nil { + t.Fatal(err) + } + err = sinkSrc.Flush(ctx) + if err != nil { + t.Fatal(err) + } + require.Equal(t, + "{\"payload\":[{\"after\":null,\"key\":[1002],\"topic:\":\"foo\"}]}", sinkDest.Latest(), + "sink %s expected to receive message %s", sinkDest.URL(), + "{\"payload\":[{\"after\":null,\"key\":[1002],\"topic:\":\"foo\"}]}") +} + +func TestWebhookSink(t *testing.T) { + defer leaktest.AfterTest(t)() + + cert, certEncoded, err := cdctest.NewCACertBase64Encoded() + if err != nil { + t.Fatal(err) + } + sinkDest, err := cdctest.StartMockWebhookSink(cert) + if err != nil { + t.Fatal(err) + } + + opts := getGenericWebhookSinkOptions() + + sinkDestHost, err := url.Parse(sinkDest.URL()) + if err != nil { + t.Fatal(err) + } + + params := sinkDestHost.Query() + params.Set(changefeedbase.SinkParamCACert, certEncoded) + sinkDestHost.RawQuery = params.Encode() + + details := jobspb.ChangefeedDetails{ + SinkURI: fmt.Sprintf("webhook-%s", sinkDestHost.String()), + Opts: opts, + } + + sinkSrc, err := setupWebhookSinkWithDetails(details) + if err != nil { + t.Fatal(err) + } + + // sink with client accepting server cert should pass + testSendAndReceiveRows(t, sinkSrc, sinkDest) + + params.Del(changefeedbase.SinkParamCACert) + sinkDestHost.RawQuery = params.Encode() + details.SinkURI = fmt.Sprintf("webhook-%s", sinkDestHost.String()) + sinkSrcNoCert, err := setupWebhookSinkWithDetails(details) + if err != nil { + t.Fatal(err) + } + + // now sink's client accepts no custom certs, should reject the server's cert and fail + err = sinkSrcNoCert.EmitRow(context.Background(), nil, []byte("[1001]"), + []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), hlc.Timestamp{}) + require.EqualError(t, err, fmt.Sprintf(`Post "%s": x509: certificate signed by unknown authority`, sinkDest.URL())) + + err = sinkSrcNoCert.Flush(context.Background()) + if err != nil { + t.Fatal(err) + } + + params.Set(changefeedbase.SinkParamSkipTLSVerify, "true") + sinkDestHost.RawQuery = params.Encode() + details.SinkURI = fmt.Sprintf("webhook-%s", sinkDestHost.String()) + sinkSrcInsecure, err := setupWebhookSinkWithDetails(details) + if err != nil { + t.Fatal(err) + } + + // client should allow unrecognized certs and pass + testSendAndReceiveRows(t, sinkSrcInsecure, sinkDest) + + // sink should throw an error if a non-2XX status code is returned + sinkDest.SetStatusCode(http.StatusBadGateway) + err = sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), + []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), hlc.Timestamp{}) + require.EqualError(t, err, "502 Bad Gateway: ") + + err = sinkSrc.Flush(context.Background()) + if err != nil { + t.Fatal(err) + } + // sink should throw an error if server is unreachable + sinkDest.Close() + err = sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), + []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), hlc.Timestamp{}) + require.Error(t, err) + require.Contains(t, err.Error(), fmt.Sprintf(`Post "%s":`, sinkDest.URL())) + + err = sinkSrc.Flush(context.Background()) + if err != nil { + t.Fatal(err) + } + + err = sinkSrc.Close() + if err != nil { + t.Fatal(err) + } + err = sinkSrcNoCert.Close() + if err != nil { + t.Fatal(err) + } + err = sinkSrcInsecure.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestWebhookSinkWithAuthOptions(t *testing.T) { + defer leaktest.AfterTest(t)() + + cert, _, err := cdctest.NewCACertBase64Encoded() + if err != nil { + t.Fatal(err) + } + + username := "crl-user" + password := "crl-pwd" + var authHeader string + cdctest.EncodeBase64ToString([]byte(fmt.Sprintf("%s:%s", username, password)), &authHeader) + + sinkDest, err := cdctest.StartMockWebhookSinkWithBasicAuth(cert, username, password) + if err != nil { + t.Fatal(err) + } + + opts := getGenericWebhookSinkOptions() + opts[changefeedbase.OptWebhookAuthHeader] = fmt.Sprintf("Basic %s", authHeader) + + sinkDestHost, err := url.Parse(sinkDest.URL()) + if err != nil { + t.Fatal(err) + } + + params := sinkDestHost.Query() + params.Set(changefeedbase.SinkParamSkipTLSVerify, "true") + sinkDestHost.RawQuery = params.Encode() + + details := jobspb.ChangefeedDetails{ + SinkURI: fmt.Sprintf("webhook-%s", sinkDestHost.String()), + Opts: opts, + } + + sinkSrc, err := setupWebhookSinkWithDetails(details) + if err != nil { + t.Fatal(err) + } + + testSendAndReceiveRows(t, sinkSrc, sinkDest) + + // no credentials should result in a 401 + delete(opts, changefeedbase.OptWebhookAuthHeader) + sinkSrcNoCreds, err := setupWebhookSinkWithDetails(details) + if err != nil { + t.Fatal(err) + } + err = sinkSrcNoCreds.EmitRow(context.Background(), nil, []byte("[1001]"), + []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), hlc.Timestamp{}) + require.EqualError(t, err, "401 Unauthorized: ") + + err = sinkSrcNoCreds.Flush(context.Background()) + if err != nil { + t.Fatal(err) + } + + // wrong credentials should result in a 401 as well + var wrongAuthHeader string + cdctest.EncodeBase64ToString([]byte(fmt.Sprintf("%s:%s", username, "wrong-password")), &wrongAuthHeader) + opts[changefeedbase.OptWebhookAuthHeader] = fmt.Sprintf("Basic %s", wrongAuthHeader) + sinkSrcWrongCreds, err := setupWebhookSinkWithDetails(details) + if err != nil { + t.Fatal(err) + } + + err = sinkSrcWrongCreds.EmitRow(context.Background(), nil, []byte("[1001]"), + []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), hlc.Timestamp{}) + require.EqualError(t, err, "401 Unauthorized: ") + + err = sinkSrcWrongCreds.Flush(context.Background()) + if err != nil { + t.Fatal(err) + } + + err = sinkSrc.Close() + if err != nil { + t.Fatal(err) + } + err = sinkSrcNoCreds.Close() + if err != nil { + t.Fatal(err) + } + err = sinkSrcWrongCreds.Close() + if err != nil { + t.Fatal(err) + } + sinkDest.Close() +} diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index d891faa24256..9903011d77ec 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1200,3 +1200,180 @@ func (k *kafkaFeed) Close() error { } return errors.CombineErrors(k.jobFeed.Close(), k.tg.wait()) } + +type webhookFeedFactory struct { + enterpriseFeedFactory +} + +var _ cdctest.TestFeedFactory = (*webhookFeedFactory)(nil) + +// makeWebhookFeedFactory returns a TestFeedFactory implementation using the `webhook-webhooks` uri. +func makeWebhookFeedFactory( + srv serverutils.TestServerInterface, db *gosql.DB, +) cdctest.TestFeedFactory { + return &webhookFeedFactory{ + enterpriseFeedFactory: enterpriseFeedFactory{ + s: srv, + db: db, + di: newDepInjector(srv), + }, + } +} + +func (f *webhookFeedFactory) Feed(create string, args ...interface{}) (cdctest.TestFeed, error) { + parsed, err := parser.ParseOne(create) + if err != nil { + return nil, err + } + createStmt := parsed.AST.(*tree.CreateChangefeed) + + cert, _, err := cdctest.NewCACertBase64Encoded() + if err != nil { + return nil, err + } + sinkDest, err := cdctest.StartMockWebhookSink(cert) + if err != nil { + return nil, err + } + + if createStmt.SinkURI == nil { + createStmt.SinkURI = tree.NewStrVal( + fmt.Sprintf("webhook-%s?insecure_tls_skip_verify=true", sinkDest.URL())) + } + ss := &sinkSynchronizer{} + wrapSink := func(s Sink) Sink { + return ¬ifyFlushSink{Sink: s, sync: ss} + } + + c := &webhookFeed{ + jobFeed: newJobFeed(f.db, wrapSink), + seenTrackerMap: make(map[string]struct{}), + ss: ss, + mockSink: sinkDest, + } + if err := f.startFeedJob(c.jobFeed, createStmt.String(), args...); err != nil { + sinkDest.Close() + return nil, err + } + return c, nil +} + +func (f *webhookFeedFactory) Server() serverutils.TestServerInterface { + return f.s +} + +type webhookFeed struct { + *jobFeed + seenTrackerMap + ss *sinkSynchronizer + mockSink *cdctest.MockWebhookSink +} + +var _ cdctest.TestFeed = (*webhookFeed)(nil) + +// Partitions implements TestFeed +func (f *webhookFeed) Partitions() []string { + return []string{``} +} + +// isResolvedTimestamp determines if the given JSON message is a resolved timestamp message. +func isResolvedTimestamp(message []byte) (bool, error) { + parsed := make(map[string]interface{}) + if err := gojson.Unmarshal(message, &parsed); err != nil { + return false, err + } + _, ok := parsed[`resolved`] + return ok, nil +} + +// extractTopicFromJSONValue extracts the `WITH topic_in_value` topic from a `WITH +// format=json, envelope=wrapped` value. +func extractTopicFromJSONValue(wrapped []byte) (topic string, value []byte, _ error) { + parsed := make(map[string]interface{}) + if err := gojson.Unmarshal(wrapped, &parsed); err != nil { + return "", nil, err + } + topicParsed := parsed[`topic`] + delete(parsed, `topic`) + + topic = fmt.Sprintf("%v", topicParsed) + var err error + if value, err = reformatJSON(parsed); err != nil { + return "", nil, err + } + return topic, value, nil +} + +// extractValueFromJSONMessage extracts the value of the first element of +// the payload array from an webhook sink JSON message. +func extractValueFromJSONMessage(message []byte) ([]byte, error) { + parsed := make(map[string][]interface{}) + if err := gojson.Unmarshal(message, &parsed); err != nil { + return nil, err + } + keyParsed := parsed[`payload`] + if len(keyParsed) <= 0 { + return nil, fmt.Errorf("payload value in json message contains no elements") + } + + var err error + var value []byte + if value, err = reformatJSON(keyParsed[0]); err != nil { + return nil, err + } + return value, nil +} + +// Next implements TestFeed +func (f *webhookFeed) Next() (*cdctest.TestFeedMessage, error) { + for { + msg := f.mockSink.Pop() + if msg != "" { + m := &cdctest.TestFeedMessage{} + if msg != "" { + var err error + var resolved bool + resolved, err = isResolvedTimestamp([]byte(msg)) + if err != nil { + return nil, err + } + if resolved { + m.Resolved = []byte(msg) + } else { + wrappedValue, err := extractValueFromJSONMessage([]byte(msg)) + if err != nil { + return nil, err + } + if m.Key, m.Value, err = extractKeyFromJSONValue([]byte(wrappedValue)); err != nil { + return nil, err + } + if m.Topic, m.Value, err = extractTopicFromJSONValue(m.Value); err != nil { + return nil, err + } + if isNew := f.markSeen(m); !isNew { + continue + } + } + return m, nil + } + m.Key, m.Value = nil, nil + return m, nil + } + + select { + case <-f.ss.eventReady(): + case <-f.shutdown: + return nil, f.terminalJobError() + } + } +} + +// Close implements TestFeed +func (f *webhookFeed) Close() error { + err := f.jobFeed.Close() + if err != nil { + return err + } + f.mockSink.Close() + return nil +} diff --git a/pkg/cmd/roachtest/BUILD.bazel b/pkg/cmd/roachtest/BUILD.bazel index 2e1eef56f289..dee5f208e0c5 100644 --- a/pkg/cmd/roachtest/BUILD.bazel +++ b/pkg/cmd/roachtest/BUILD.bazel @@ -130,6 +130,7 @@ go_library( deps = [ "//pkg/base", "//pkg/ccl/changefeedccl/cdctest", + "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/cli", "//pkg/cmd/cmpconn", "//pkg/cmd/internal/issues", diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 2c480a75bec7..241e6b072d46 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -22,6 +22,7 @@ import ( "fmt" "math/big" "net" + "net/url" "path/filepath" "regexp" "sort" @@ -32,6 +33,7 @@ import ( "github.com/Shopify/sarama" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/logger" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" @@ -52,6 +54,13 @@ const ( ledgerWorkloadType workloadType = "ledger" ) +type sinkType int32 + +const ( + cloudStorageSink sinkType = iota + 1 + webhookSink +) + type cdcTestArgs struct { workloadType workloadType tpccWarehouseCount int @@ -59,7 +68,7 @@ type cdcTestArgs struct { initialScan bool kafkaChaos bool crdbChaos bool - cloudStorageSink bool + whichSink sinkType sinkURI string targetInitialScanLatency time.Duration @@ -89,11 +98,32 @@ func cdcBasicTest(ctx context.Context, t test.Test, c cluster.Cluster, args cdcT var sinkURI string if args.sinkURI != "" { sinkURI = args.sinkURI - } else if args.cloudStorageSink { + } else if args.whichSink == cloudStorageSink { ts := timeutil.Now().Format(`20060102150405`) // cockroach-tmp is a multi-region bucket with a TTL to clean up old // data. sinkURI = `experimental-gs://cockroach-tmp/roachtest/` + ts + "?AUTH=implicit" + } else if args.whichSink == webhookSink { + // setup a sample cert for use by the mock sink + cert, certEncoded, err := cdctest.NewCACertBase64Encoded() + if err != nil { + t.Fatal(err) + } + sinkDest, err := cdctest.StartMockWebhookSink(cert) + if err != nil { + t.Fatal(err) + } + + sinkDestHost, err := url.Parse(sinkDest.URL()) + if err != nil { + t.Fatal(err) + } + + params := sinkDestHost.Query() + params.Set(changefeedbase.SinkParamCACert, certEncoded) + sinkDestHost.RawQuery = params.Encode() + + sinkURI = fmt.Sprintf("webhook-%s", sinkDestHost.String()) } else { t.Status("installing kafka") kafka.install(ctx) @@ -723,12 +753,33 @@ func registerCDC(r *testRegistry) { tpccWarehouseCount: 50, workloadDuration: "30m", initialScan: true, - cloudStorageSink: true, + whichSink: cloudStorageSink, targetInitialScanLatency: 30 * time.Minute, targetSteadyLatency: time.Minute, }) }, }) + // TODO(ryan min): uncomment once parallelism is implemented for webhook + // sink, currently fails with "initial scan did not complete" + /* + r.Add(testSpec{ + Name: "cdc/webhook-sink", + Owner: OwnerCDC, + Cluster: r.makeClusterSpec(4, spec.CPU(16)), + RequiresLicense: true, + Run: func(ctx context.Context, t *test, c Cluster) { + cdcBasicTest(ctx, t, c, cdcTestArgs{ + workloadType: tpccWorkloadType, + tpccWarehouseCount: 100, + workloadDuration: "30m", + initialScan: true, + whichSink: webhookSink, + targetInitialScanLatency: 30 * time.Minute, + targetSteadyLatency: time.Minute, + }) + }, + }) + */ r.Add(TestSpec{ Name: "cdc/kafka-auth", Owner: `cdc`, @@ -1573,7 +1624,7 @@ func createChangefeed(db *gosql.DB, targets, sinkURL string, args cdcTestArgs) ( var jobID int createStmt := fmt.Sprintf(`CREATE CHANGEFEED FOR %s INTO $1`, targets) extraArgs := []interface{}{sinkURL} - if args.cloudStorageSink { + if args.whichSink == cloudStorageSink || args.whichSink == webhookSink { createStmt += ` WITH resolved='10s', envelope=wrapped` } else { createStmt += ` WITH resolved`