Skip to content

Commit

Permalink
Merge #66497
Browse files Browse the repository at this point in the history
66497: changefeedccl: webhook sink implementation r=spiffyyeng a=spiffyyeng

Adds support for a new enterprise webhook-style HTTP sink that sends
JSON POST payloads for every row and resolved timestamp emitted by the
changefeed. Configurable with new options and query params as mentioned
in the corresponding RFC.

Resolves #65816

Release note (enterprise change): Introduces new HTTP sink (prefix
'webhook-https') to send individual changefeed messages as webhook
events.

RFC PR: #65927 

Co-authored-by: Ryan Min <[email protected]>
  • Loading branch information
craig[bot] and spiffyy99 committed Jun 28, 2021
2 parents 52f87b5 + e40e737 commit d43d9fd
Show file tree
Hide file tree
Showing 23 changed files with 1,250 additions and 133 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"sink_cloudstorage.go",
"sink_kafka.go",
"sink_sql.go",
"sink_webhook.go",
"testing_knobs.go",
"tls.go",
],
Expand Down Expand Up @@ -121,6 +122,7 @@ go_test(
"show_changefeed_jobs_test.go",
"sink_cloudstorage_test.go",
"sink_test.go",
"sink_webhook_test.go",
"testfeed_test.go",
"validations_test.go",
],
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func createBenchmarkChangefeed(
},
}
initialHighWater := hlc.Timestamp{}
encoder, err := makeJSONEncoder(details.Opts)
encoder, err := makeJSONEncoder(details.Opts, details.Targets)
if err != nil {
return nil, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "cdctest",
srcs = [
"mock_webhook_sink.go",
"nemeses.go",
"schema_registry.go",
"testfeed.go",
Expand Down
145 changes: 145 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/mock_webhook_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package cdctest

import (
"crypto/tls"
"io/ioutil"
"net/http"
"net/http/httptest"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// MockWebhookSink is the Webhook sink used in tests.
type MockWebhookSink struct {
basicAuth bool
username, password string
server *httptest.Server
mu struct {
statusCode int
rows []string
syncutil.Mutex
}
}

// StartMockWebhookSink creates and starts a mock webhook sink for tests.
func StartMockWebhookSink(certificate *tls.Certificate) (*MockWebhookSink, error) {
s := makeMockWebhookSink()
if certificate != nil {
s.server.TLS = &tls.Config{
Certificates: []tls.Certificate{*certificate},
}
}
s.server.StartTLS()
return s, nil
}

// StartMockWebhookSinkWithBasicAuth creates and starts a mock webhook sink for
// tests with basic username/password auth.
func StartMockWebhookSinkWithBasicAuth(
certificate *tls.Certificate, username, password string,
) (*MockWebhookSink, error) {
s := makeMockWebhookSink()
s.basicAuth = true
s.username = username
s.password = password
if certificate != nil {
s.server.TLS = &tls.Config{
Certificates: []tls.Certificate{*certificate},
}
}
s.server.StartTLS()
return s, nil
}

func makeMockWebhookSink() *MockWebhookSink {
s := &MockWebhookSink{}
s.mu.statusCode = http.StatusOK
s.server = httptest.NewUnstartedServer(http.HandlerFunc(s.requestHandler))
return s
}

// URL returns the http address of this mock Webhook sink.
func (s *MockWebhookSink) URL() string {
return s.server.URL
}

// SetStatusCode sets the HTTP status code to use when responding to a request.
// Useful for testing error handling behavior on client side.
func (s *MockWebhookSink) SetStatusCode(statusCode int) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.statusCode = statusCode
}

// Close closes the mock Webhook sink.
func (s *MockWebhookSink) Close() {
s.server.Close()
s.server.CloseClientConnections()
}

// Latest returns the most recent message received by the MockWebhookSink.
func (s *MockWebhookSink) Latest() string {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.mu.rows) == 0 {
return ""
}
latest := s.mu.rows[len(s.mu.rows)-1]
return latest
}

// Pop deletes and returns the oldest message from MockWebhookSink
func (s *MockWebhookSink) Pop() string {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.mu.rows) > 0 {
oldest := s.mu.rows[0]
s.mu.rows = s.mu.rows[1:]
return oldest
}
return ""
}

func (s *MockWebhookSink) requestHandler(hw http.ResponseWriter, hr *http.Request) {
method := hr.Method

var err error
switch {
case method == http.MethodPost:
if s.basicAuth {
username, password, ok := hr.BasicAuth()
if !ok || s.username != username || s.password != password {
hw.WriteHeader(http.StatusUnauthorized)
return
}
}
err = s.publish(hw, hr)
default:
hw.WriteHeader(http.StatusNotFound)
return
}
if err != nil {
http.Error(hw, err.Error(), http.StatusInternalServerError)
}
}

func (s *MockWebhookSink) publish(hw http.ResponseWriter, hr *http.Request) error {
defer hr.Body.Close()
row, err := ioutil.ReadAll(hr.Body)
if err != nil {
return err
}
s.mu.Lock()
s.mu.rows = append(s.mu.rows, string(row))
s.mu.Unlock()
hw.WriteHeader(s.mu.statusCode)
return nil
}
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type TestFeed interface {
// by Next.
Partitions() []string
// Next returns the next message. Within a given topic+partition, the order is
// preserved, but not otherwisete. Either len(key) and len(value) will be
// preserved, but not otherwise. Either len(key) and len(value) will be
// greater than zero (a row updated) or len(payload) will be (a resolved
// timestamp).
Next() (*TestFeedMessage, error)
Expand Down
37 changes: 37 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/tls_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package cdctest
import (
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/base64"
Expand All @@ -34,6 +35,42 @@ func EncodeBase64ToString(src []byte, dest *string) {
}
}

// NewCACertBase64Encoded generates a new CA cert and returns the
// cert object as well as a base 64 encoded PEM version.
func NewCACertBase64Encoded() (*tls.Certificate, string, error) {
keyLength := 2048

caKey, err := rsa.GenerateKey(rand.Reader, keyLength)
if err != nil {
return nil, "", errors.Wrap(err, "CA private key")
}

caCert, _, err := GenerateCACert(caKey)
if err != nil {
return nil, "", errors.Wrap(err, "CA cert gen")
}

caKeyPEM, err := PemEncodePrivateKey(caKey)
if err != nil {
return nil, "", errors.Wrap(err, "pem encode CA key")
}

caCertPEM, err := PemEncodeCert(caCert)
if err != nil {
return nil, "", errors.Wrap(err, "pem encode CA cert")
}

cert, err := tls.X509KeyPair([]byte(caCertPEM), []byte(caKeyPEM))
if err != nil {
return nil, "", errors.Wrap(err, "CA cert parse from PEM")
}

var caCertBase64 string
EncodeBase64ToString([]byte(caCertPEM), &caCertBase64)

return &cert, caCertBase64, nil
}

// GenerateCACert generates a new self-signed CA cert using priv
func GenerateCACert(priv *rsa.PrivateKey) ([]byte, *x509.Certificate, error) {
serial, err := randomSerial()
Expand Down
28 changes: 19 additions & 9 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,18 @@ func changefeedPlanHook(
// and `format` if the user didn't specify them.
// - Then `getEncoder` is run to return any configuration errors.
// - Then the changefeed is opted in to `OptKeyInValue` for any cloud
// storage sink. Kafka etc have a key and value field in each message but
// cloud storage sinks don't have anywhere to put the key. So if the key
// is not in the value, then for DELETEs there is no way to recover which
// key was deleted. We could make the user explicitly pass this option for
// every cloud storage sink and error if they don't, but that seems
// user-hostile for insufficient reason. We can't do this any earlier,
// because we might return errors about `key_in_value` being incompatible
// which is confusing when the user didn't type that option.
// storage sink or webhook sink. Kafka etc have a key and value field in
// each message but cloud storage sinks and webhook sinks don't have
// anywhere to put the key. So if the key is not in the value, then for
// DELETEs there is no way to recover which key was deleted. We could make
// the user explicitly pass this option for every cloud storage sink/
// webhook sink and error if they don't, but that seems user-hostile for
// insufficient reason. We can't do this any earlier, because we might
// return errors about `key_in_value` being incompatible which is
// confusing when the user didn't type that option.
// This is the same for the topic and webhook sink, which uses
// `topic_in_value` to embed the topic in the value by default, since it
// has no other avenue to express the topic.
// - Finally, we create a "canary" sink to test sink configuration and
// connectivity. This has to go last because it is strange to return sink
// connectivity errors before we've finished validating all the other
Expand All @@ -270,9 +274,12 @@ func changefeedPlanHook(
if _, err := getEncoder(ctx, details.Opts, details.Targets); err != nil {
return err
}
if isCloudStorageSink(parsedSink) {
if isCloudStorageSink(parsedSink) || isWebhookSink(parsedSink) {
details.Opts[changefeedbase.OptKeyInValue] = ``
}
if isWebhookSink(parsedSink) {
details.Opts[changefeedbase.OptTopicInValue] = ``
}

if !unspecifiedSink && p.ExecCfg().ExternalIODirConfig.DisableOutbound {
return errors.Errorf("Outbound IO is disabled by configuration, cannot create changefeed into %s", parsedSink.Scheme)
Expand Down Expand Up @@ -424,6 +431,9 @@ func changefeedJobDescription(
SinkURI: tree.NewDString(cleanedSinkURI),
}
for k, v := range opts {
if k == changefeedbase.OptWebhookAuthHeader {
v = redactWebhookAuthHeader(v)
}
opt := tree.KVOption{Key: tree.Name(k)}
if len(v) > 0 {
opt.Value = tree.NewDString(v)
Expand Down
Loading

0 comments on commit d43d9fd

Please sign in to comment.