Skip to content

Commit

Permalink
Merge #99833
Browse files Browse the repository at this point in the history
99833: changefeedccl: reduce duplicate schema registrations r=[miretskiy] a=HonoreDB

When running many changefeeds, and/or changefeeds with many parallel processors, it's possible to overwhelm the schema registry. This PR moves the number of calls to closer to O(number of nodes) by adding a shared in-memory cache by endpoint, subject, and schema.

Informs #99221.

Release note (enterprise change): Changefeeds using the WITH confluent_schema_registry option will make fewer duplicate schema registrations.

Co-authored-by: Aaron Zinger <[email protected]>
  • Loading branch information
craig[bot] and HonoreDB committed Apr 6, 2023
2 parents d618a1d + 6660958 commit 7de717e
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 9 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ func (r *SchemaRegistry) registerSchema(subject string, schema string) int32 {
return id
}

// RegistrationCount returns the number of Registration requests received.
func (r *SchemaRegistry) RegistrationCount() int {
r.mu.Lock()
defer r.mu.Unlock()
return int(r.mu.idAlloc)
}

var (
// We are slightly stricter than confluent here as they allow
// a trailing slash.
Expand Down
95 changes: 88 additions & 7 deletions pkg/ccl/changefeedccl/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -118,7 +120,7 @@ func getAndDeleteParams(u *url.URL) (*schemaRegistryParams, error) {

func newConfluentSchemaRegistry(
baseURL string, p externalConnectionProvider, sliMetrics *sliMetrics,
) (*confluentSchemaRegistry, error) {
) (schemaRegistry, error) {
u, err := url.Parse(baseURL)
if err != nil {
return nil, errors.Wrap(err, "malformed schema registry url")
Expand All @@ -136,6 +138,18 @@ func newConfluentSchemaRegistry(
return nil, errors.Errorf("unsupported scheme: %q", u.Scheme)
}

schemaRegistrySingletons.mu.Lock()
src, ok := schemaRegistrySingletons.cachePerEndpoint[baseURL]
if !ok {
src = &schemaRegistryCache{entries: cache.NewUnorderedCache(
cache.Config{Policy: cache.CacheLRU, ShouldEvict: func(size int, _, _ interface{}) bool {
return size > 1023
}}),
}
schemaRegistrySingletons.cachePerEndpoint[baseURL] = src
}
schemaRegistrySingletons.mu.Unlock()

s, err := getAndDeleteParams(u)
if err != nil {
return nil, err
Expand All @@ -148,12 +162,16 @@ func newConfluentSchemaRegistry(

retryOpts := base.DefaultRetryOptions()
retryOpts.MaxRetries = 5
return &confluentSchemaRegistry{
baseURL: u,
client: httpClient,
retryOpts: retryOpts,
sliMetrics: sliMetrics,
}, nil
reg := schemaRegistryWithCache{
base: &confluentSchemaRegistry{
baseURL: u,
client: httpClient,
retryOpts: retryOpts,
sliMetrics: sliMetrics,
},
cache: src,
}
return &reg, nil
}

// Setup the httputil.Client to use when dialing Confluent schema registry. If `ca_cert`
Expand Down Expand Up @@ -292,3 +310,66 @@ func (r *confluentSchemaRegistry) urlForPath(relPath string) string {
u.Path = path.Join(u.EscapedPath(), relPath)
return u.String()
}

type schemaRegistryCacheKey struct {
subject string
schema string
}

type schemaRegistryCache struct {
mu syncutil.Mutex
// cache[schemaRegistryCacheKey]registeredSchemaID
entries *cache.UnorderedCache
}

// Get returns the already-registered id for this key if present, and
// a bool indicating a hit or miss.
func (src *schemaRegistryCache) Get(key schemaRegistryCacheKey) (int32, bool) {
v, ok := src.entries.Get(key)
if ok {
return v.(int32), true
}
return 0, false
}

// Add caches a registered schema id.
func (src *schemaRegistryCache) Add(key schemaRegistryCacheKey, id int32) {
src.entries.Add(key, id)
}

type schemaRegistryWithCache struct {
base schemaRegistry
cache *schemaRegistryCache
}

// Ping implements the schemaRegistry interface.
func (csr *schemaRegistryWithCache) Ping(ctx context.Context) error {
return csr.base.Ping(ctx)
}

// RegisterSchemaForSubject implements the schemaRegistry interface.
func (csr *schemaRegistryWithCache) RegisterSchemaForSubject(
ctx context.Context, subject string, schema string,
) (int32, error) {
cacheKey := schemaRegistryCacheKey{
subject: subject, schema: schema,
}
csr.cache.mu.Lock()
defer csr.cache.mu.Unlock()
id, ok := csr.cache.Get(cacheKey)
if ok {
return id, nil
}
id, err := csr.base.RegisterSchemaForSubject(ctx, subject, schema)
if err == nil {
csr.cache.Add(cacheKey, id)
}
return id, err
}

type sharedSchemaRegistryCaches struct {
mu syncutil.Mutex
cachePerEndpoint map[string]*schemaRegistryCache
}

var schemaRegistrySingletons = &sharedSchemaRegistryCaches{cachePerEndpoint: make(map[string]*schemaRegistryCache)}
51 changes: 49 additions & 2 deletions pkg/ccl/changefeedccl/schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ package changefeedccl
import (
"context"
"errors"
"fmt"
"net/url"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -42,7 +44,10 @@ func TestConfluentSchemaRegistry(t *testing.T) {
defer regServer.Close()
r, err := newConfluentSchemaRegistry(regServer.URL(), nil, nil)
require.NoError(t, err)
require.Equal(t, defaultSchemaRegistryTimeout, r.client.Timeout)
getTimeout := func(r schemaRegistry) time.Duration {
return r.(*schemaRegistryWithCache).base.(*confluentSchemaRegistry).client.Timeout
}
require.Equal(t, defaultSchemaRegistryTimeout, getTimeout(r))

// add explicit timeout param.
u, err := url.Parse(regServer.URL())
Expand All @@ -52,7 +57,7 @@ func TestConfluentSchemaRegistry(t *testing.T) {
u.RawQuery = values.Encode()
r, err = newConfluentSchemaRegistry(u.String(), nil, nil)
require.NoError(t, err)
require.Equal(t, 42*time.Millisecond, r.client.Timeout)
require.Equal(t, 42*time.Millisecond, getTimeout(r))
})
}

Expand Down Expand Up @@ -92,6 +97,48 @@ func TestConfluentSchemaRegistryExternalConnection(t *testing.T) {

}

func TestConfluentSchemaRegistrySharedCache(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

regServer := cdctest.StartTestSchemaRegistry()
defer regServer.Close()
require.Equal(t, 0, regServer.RegistrationCount())

var wg sync.WaitGroup

// Multiple registrations of the same schema hit a shared cache.
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
r, err := newConfluentSchemaRegistry(regServer.URL(), nil, nil)
require.NoError(t, err)
_, err = r.RegisterSchemaForSubject(context.Background(), "subject1", "schema")
require.NoError(t, err)
wg.Done()

}()
}
wg.Wait()
require.Equal(t, 1, regServer.RegistrationCount())

// Registrations of different schemas don't share a cache, even if the subject is the same.
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
r, err := newConfluentSchemaRegistry(regServer.URL(), nil, nil)
require.NoError(t, err)
_, err = r.RegisterSchemaForSubject(context.Background(), "subject1", fmt.Sprintf("schema1%d", i))
require.NoError(t, err)
wg.Done()

}(i)
}
wg.Wait()
require.Equal(t, 11, regServer.RegistrationCount())

}

func TestConfluentSchemaRegistryPing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit 7de717e

Please sign in to comment.