Skip to content

Commit

Permalink
descs/collection: Added memory monitoring to Collection.kvDescriptors
Browse files Browse the repository at this point in the history
Collection struct agressively caches descriptors to avoid reading from
storage. Previously, we do not have memory monitoring on those cached
descriptors, so we risk OOM crashes.

This PR adds memory monitoring on one of the cache descriptor set -- the
kvDescriptors -- since it stores all descriptors it reads from storage
and it is done per transaction and not shared between session. This
means we would have one copy of all descriptors for each session opened
to the database, consuming potentially a large amount of memory when we
have a lot of descriptors.

One effort to support this memory monitoring is that we also added logic
in nstree.Catalog to keep track of the memory allocation in bytes its
underlying map (which consists of two maps for entries in the
descriptor and namespace tables) consumes.

Release note: None
  • Loading branch information
Xiang-Gu committed Apr 11, 2022
1 parent 5c1ae72 commit 7f16831
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@ func getQualifiedTableName(
func getQualifiedTableNameObj(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, desc catalog.TableDescriptor,
) (tree.TableName, error) {
col := execCfg.CollectionFactory.MakeCollection(ctx, nil /* TemporarySchemaProvider */)
col := execCfg.CollectionFactory.MakeCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */)
dbDesc, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, desc.GetParentID())
if err != nil {
return tree.TableName{}, err
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}

collectionFactory := descs.NewCollectionFactory(
ctx,
cfg.Settings,
leaseMgr,
virtualSchemas,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/catalog/descs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/retry",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
Expand All @@ -89,6 +90,7 @@ go_test(
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
Expand All @@ -105,6 +107,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//assert",
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
)

Expand All @@ -48,14 +49,15 @@ func makeCollection(
systemNamespace *systemDatabaseNamespaceCache,
virtualSchemas catalog.VirtualSchemas,
temporarySchemaProvider TemporarySchemaProvider,
monitor *mon.BytesMonitor,
) Collection {
return Collection{
settings: settings,
version: settings.Version.ActiveVersion(ctx),
hydratedTables: hydratedTables,
virtual: makeVirtualDescriptors(virtualSchemas),
leased: makeLeasedDescriptors(leaseMgr),
kv: makeKVDescriptors(codec, systemNamespace),
kv: makeKVDescriptors(codec, systemNamespace, monitor),
temporary: makeTemporaryDescriptors(settings, codec, temporarySchemaProvider),
direct: makeDirect(ctx, codec, settings),
}
Expand Down Expand Up @@ -176,7 +178,7 @@ func (tc *Collection) ReleaseLeases(ctx context.Context) {
func (tc *Collection) ReleaseAll(ctx context.Context) {
tc.ReleaseLeases(ctx)
tc.uncommitted.reset()
tc.kv.reset()
tc.kv.reset(ctx)
tc.synthetic.reset()
tc.deletedDescs = catalog.DescriptorIDSet{}
tc.skipValidationOnWrite = false
Expand Down
71 changes: 71 additions & 0 deletions pkg/sql/catalog/descs/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ package descs_test
import (
"context"
"fmt"
"math"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
Expand All @@ -37,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/lib/pq/oid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -597,3 +600,71 @@ func TestCollectionPreservesPostDeserializationChanges(t *testing.T) {
return nil
}))
}

// TestCollectionProperlyUsesMemoryMonitoring ensures that memory monitoring
// on Collection is working properly.
// Namely, we are currently only tracking memory usage on Collection.kvDescriptors
// since it reads all descriptors from storage, which can be huge.
//
// The testing strategy is to
// 1. Create tables that are very large into the database (so that when we read them
// into memory later with Collection, a lot of memory will be allocated and used).
// 2. Hook up a monitor with infinite budget to this Collection and invoke method
// so that this Collection reads all the descriptors into memory. With an unlimited
// monitor, this should succeed without error.
// 3. Change the monitor budget to something small. Repeat step 2 and expect an error
// being thrown out when reading all those descriptors into memory to validate the
// memory monitor indeed kicked in and had an effect.
func TestKVDescriptorsProperlyUsesMemoryMonitoring(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
txn := tc.Server(0).DB().NewTxn(ctx, "test txn")

// Create a lot of descriptors.
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0))
numTblsToInsert := 100
for i := 0; i < numTblsToInsert; i++ {
tdb.Exec(t, fmt.Sprintf("CREATE TABLE table_%v()", i))
}

// Create a monitor to be used to track memory usage in a Collection.
monitor := mon.NewMonitor("test_monitor", mon.MemoryResource,
nil, nil, -1, 0, cluster.MakeTestingClusterSettings())

// Start the monitor with unlimited budget.
monitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64))

// Create a `Collection` with monitor hooked up.
col := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory.
MakeCollection(ctx, nil /* temporarySchemaProvider */, monitor)
require.Equal(t, int64(0), monitor.AllocBytes())

// Read all the descriptors into `col` and assert this read will finish without error.
_, err1 := col.GetAllDescriptors(ctx, txn)
require.NoError(t, err1)

// Clean up and assert the monitor's allocation is back to 0 properly after releasing.
allocatedMemoryInBytes := monitor.AllocBytes()
col.ReleaseAll(ctx)
require.Equal(t, int64(0), monitor.AllocBytes())

// Restart the monitor to a smaller budget (in fact, let's be bold by setting it to be only one byte below
// what has been allocated in the previous round).
monitor.Start(ctx, nil, mon.MakeStandaloneBudget(allocatedMemoryInBytes-1))
require.Equal(t, int64(0), monitor.AllocBytes())

// Repeat the process again and assert this time memory allocation will err out.
col = tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory.
MakeCollection(ctx, nil /* temporarySchemaProvider */, monitor)
_, err2 := col.GetAllDescriptors(ctx, txn)
require.Error(t, err2)

// Clean up
col.ReleaseAll(ctx)
require.Equal(t, int64(0), monitor.AllocBytes())
monitor.Stop(ctx)
}
18 changes: 15 additions & 3 deletions pkg/sql/catalog/descs/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/hydratedtables"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)

// CollectionFactory is used to construct a new Collection.
Expand All @@ -31,11 +32,13 @@ type CollectionFactory struct {
systemDatabase *systemDatabaseNamespaceCache
spanConfigSplitter spanconfig.Splitter
spanConfigLimiter spanconfig.Limiter
defaultMonitor *mon.BytesMonitor
}

// NewCollectionFactory constructs a new CollectionFactory which holds onto
// the node-level dependencies needed to construct a Collection.
func NewCollectionFactory(
ctx context.Context,
settings *cluster.Settings,
leaseMgr *lease.Manager,
virtualSchemas catalog.VirtualSchemas,
Expand All @@ -52,6 +55,9 @@ func NewCollectionFactory(
systemDatabase: newSystemDatabaseNamespaceCache(leaseMgr.Codec()),
spanConfigSplitter: spanConfigSplitter,
spanConfigLimiter: spanConfigLimiter,
defaultMonitor: mon.NewUnlimitedMonitor(ctx, "CollectionFactoryDefaultUnlimitedMonitor",
mon.MemoryResource, nil /* curCount */, nil, /* maxHist */
0 /* noteworthy */, settings),
}
}

Expand All @@ -68,16 +74,22 @@ func NewBareBonesCollectionFactory(

// MakeCollection constructs a Collection for the purposes of embedding.
func (cf *CollectionFactory) MakeCollection(
ctx context.Context, temporarySchemaProvider TemporarySchemaProvider,
ctx context.Context, temporarySchemaProvider TemporarySchemaProvider, monitor *mon.BytesMonitor,
) Collection {
if monitor == nil {
// If an upstream monitor is not provided, the default, unlimited monitor will be used.
// All downstream resource allocation/releases on this default monitor will then be no-ops.
monitor = cf.defaultMonitor
}

return makeCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydratedTables, cf.systemDatabase,
cf.virtualSchemas, temporarySchemaProvider)
cf.virtualSchemas, temporarySchemaProvider, monitor)
}

// NewCollection constructs a new Collection.
func (cf *CollectionFactory) NewCollection(
ctx context.Context, temporarySchemaProvider TemporarySchemaProvider,
) *Collection {
c := cf.MakeCollection(ctx, temporarySchemaProvider)
c := cf.MakeCollection(ctx, temporarySchemaProvider, nil /* monitor */)
return &c
}
17 changes: 15 additions & 2 deletions pkg/sql/catalog/descs/kv_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -54,6 +55,10 @@ type kvDescriptors struct {
// observed.
// These are purged at the same time as allDescriptors.
allSchemasForDatabase map[descpb.ID]map[descpb.ID]string

// memAcc is the actual account of an injected, upstream monitor
// to track memory usage of kvDescriptors.
memAcc mon.BoundAccount
}

// allDescriptors is an abstraction to capture the complete set of descriptors
Expand Down Expand Up @@ -85,16 +90,18 @@ func (d *allDescriptors) contains(id descpb.ID) bool {
}

func makeKVDescriptors(
codec keys.SQLCodec, systemNamespace *systemDatabaseNamespaceCache,
codec keys.SQLCodec, systemNamespace *systemDatabaseNamespaceCache, monitor *mon.BytesMonitor,
) kvDescriptors {
return kvDescriptors{
codec: codec,
systemNamespace: systemNamespace,
memAcc: monitor.MakeBoundAccount(),
}
}

func (kd *kvDescriptors) reset() {
func (kd *kvDescriptors) reset(ctx context.Context) {
kd.releaseAllDescriptors()
kd.memAcc.Clear(ctx)
}

// releaseAllDescriptors releases the cached slice of all descriptors
Expand Down Expand Up @@ -262,6 +269,12 @@ func (kd *kvDescriptors) getAllDescriptors(
return nstree.Catalog{}, err
}

// Monitor memory usage of the catalog recently read from storage.
if err := kd.memAcc.Grow(ctx, c.ByteSize()); err != nil {
err = errors.Wrap(err, "Memory usage exceeds limit for kvDescriptors.allDescriptors.")
return nstree.Catalog{}, err
}

descs := c.OrderedDescriptors()
ve := c.Validate(ctx, version, catalog.ValidationReadTelemetry, catalog.ValidationLevelCrossReferences, descs...)
if err := ve.CombinedError(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/descs/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (cf *CollectionFactory) Txn(
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
modifiedDescriptors = nil
deletedDescs = catalog.DescriptorIDSet{}
descsCol = cf.MakeCollection(ctx, nil /* temporarySchemaProvider */)
descsCol = cf.MakeCollection(ctx, nil /* temporarySchemaProvider */, nil /* monitor */)
defer descsCol.ReleaseAll(ctx)
if !cf.settings.Version.IsActive(
ctx, clusterversion.DisableSystemConfigGossipTrigger,
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/catalog/nstree/by_name_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func (t byNameMap) getByName(parentID, parentSchemaID descpb.ID, name string) ca
return got
}

func (t byNameMap) delete(d catalog.NameKey) {
delete(t.t, makeByNameItem(d).get())
func (t byNameMap) delete(d catalog.NameKey) (removed catalog.NameEntry) {
removed, _ = delete(t.t, makeByNameItem(d).get()).(catalog.NameEntry)
return removed
}

func (t byNameMap) clear() {
Expand Down
33 changes: 28 additions & 5 deletions pkg/sql/catalog/nstree/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package nstree
import (
"context"
"strings"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -26,6 +27,7 @@ import (
// thereof.
type Catalog struct {
underlying Map
byteSize int64
}

// ForEachDescriptorEntry iterates over all descriptor table entries in an
Expand Down Expand Up @@ -208,6 +210,11 @@ func (c Catalog) ValidateWithRecover(
return c.Validate(ctx, version, catalog.NoValidationTelemetry, catalog.ValidationLevelAllPreTxnCommit, desc)
}

// ByteSize returns memory usage of the underlying map in bytes.
func (c Catalog) ByteSize() int64 {
return c.byteSize
}

// MutableCatalog is like Catalog but mutable.
type MutableCatalog struct {
Catalog
Expand All @@ -219,7 +226,10 @@ func (mc *MutableCatalog) UpsertDescriptorEntry(desc catalog.Descriptor) {
return
}
mc.underlying.maybeInitialize()
mc.underlying.byID.upsert(desc)
if replaced := mc.underlying.byID.upsert(desc); replaced != nil {
mc.byteSize -= replaced.(catalog.Descriptor).ByteSize()
}
mc.byteSize += desc.ByteSize()
}

// DeleteDescriptorEntry removes a descriptor from the MutableCatalog.
Expand All @@ -228,7 +238,9 @@ func (mc *MutableCatalog) DeleteDescriptorEntry(id descpb.ID) {
return
}
mc.underlying.maybeInitialize()
mc.underlying.byID.delete(id)
if removed := mc.underlying.byID.delete(id); removed != nil {
mc.byteSize -= removed.(catalog.Descriptor).ByteSize()
}
}

// UpsertNamespaceEntry adds a name -> id mapping to the MutableCatalog.
Expand All @@ -237,14 +249,18 @@ func (mc *MutableCatalog) UpsertNamespaceEntry(key catalog.NameKey, id descpb.ID
return
}
mc.underlying.maybeInitialize()
mc.underlying.byName.upsert(&namespaceEntry{
nsEntry := &namespaceEntry{
NameInfo: descpb.NameInfo{
ParentID: key.GetParentID(),
ParentSchemaID: key.GetParentSchemaID(),
Name: key.GetName(),
},
ID: id,
})
}
if replaced := mc.underlying.byName.upsert(nsEntry); replaced != nil {
mc.byteSize -= replaced.(*namespaceEntry).ByteSize()
}
mc.byteSize += nsEntry.ByteSize()
}

// DeleteNamespaceEntry removes a name -> id mapping from the MutableCatalog.
Expand All @@ -253,7 +269,9 @@ func (mc *MutableCatalog) DeleteNamespaceEntry(key catalog.NameKey) {
return
}
mc.underlying.maybeInitialize()
mc.underlying.byName.delete(key)
if removed := mc.underlying.byName.delete(key); removed != nil {
mc.byteSize -= removed.(*namespaceEntry).ByteSize()
}
}

// Clear empties the MutableCatalog.
Expand All @@ -272,3 +290,8 @@ var _ catalog.NameEntry = namespaceEntry{}
func (e namespaceEntry) GetID() descpb.ID {
return e.ID
}

// ByteSize returns the number of bytes a namespaceEntry object takes.
func (e namespaceEntry) ByteSize() int64 {
return int64(e.NameInfo.Size()) + int64(unsafe.Sizeof(e.ID))
}
Loading

0 comments on commit 7f16831

Please sign in to comment.