Skip to content

Commit

Permalink
sql: implement sequence caching
Browse files Browse the repository at this point in the history
Previously, incrementing sequences at a high throughput
would result in many distributed writes to the KV layer
due to MVCC. This has caused garbage collection problems
in the past. This would occur in situations such as
bulk importing data while using the sequence number as an
id for each new row being added.

This change allows clients to cache sequence numbers in their local
session data. When the cache is empty, the sequence will be
incremented once by the cache size * increment amount, which are
both sequence options. Then, all the intermediate values will be
cached locally on a node to be given out whenever the sequence is
incremented.

To accommodate schema changes, cached sequences values will be
invalidated when new descriptor versions are seen by the cache.
This invalidation can occur when old versions are seen as well
to accommodate schema change rollbacks.

Release note (sql change): Using the CACHE sequence option no longer
results in an "unimplemented" error. The CACHE option is now fully
implemented and will allow nodes to cache sequence numbers. A cache
size of 1 means that there is no cache, and cache sizes of less than 1
are not valid.
  • Loading branch information
jayshrivastava committed Feb 4, 2021
1 parent 11db4d0 commit 850a09e
Show file tree
Hide file tree
Showing 11 changed files with 914 additions and 407 deletions.
800 changes: 415 additions & 385 deletions pkg/sql/catalog/descpb/structured.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pkg/sql/catalog/descpb/structured.proto
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,10 @@ message TableDescriptor {
}

optional SequenceOwner sequence_owner = 6 [(gogoproto.nullable) = false];

// The number of values (which have already been created in KV)
// that a node can cache locally.
optional int64 cache_size = 7 [(gogoproto.nullable) = false];
}

// The presence of sequence_opts indicates that this descriptor is for a sequence.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/systemschema/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ var (
MinValue: 1,
MaxValue: math.MaxInt64,
Start: 1,
CacheSize: 1,
},
Privileges: descpb.NewCustomSuperuserPrivilegeDescriptor(
descpb.SystemAllowedPrivileges[keys.DescIDSequenceID], security.NodeUserName()),
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2272,6 +2272,11 @@ func (m *sessionDataMutator) SetNoticeDisplaySeverity(severity pgnotice.DisplayS
m.data.NoticeDisplaySeverity = severity
}

// initSequenceCache creates an empty sequence cache instance for the session.
func (m *sessionDataMutator) initSequenceCache() {
m.data.SequenceCache = sessiondata.SequenceCache{}
}

type sqlStatsCollector struct {
// sqlStats tracks per-application statistics for all applications on each
// node.
Expand Down
73 changes: 62 additions & 11 deletions pkg/sql/logictest/testdata/logic_test/sequences
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,11 @@ CREATE SEQUENCE limit_test MAXVALUE 10 START WITH 11
statement error pgcode 22023 START value \(5\) cannot be less than MINVALUE \(10\)
CREATE SEQUENCE limit_test MINVALUE 10 START WITH 5

statement error pgcode 22023 CACHE \(-1\) must be greater than zero
CREATE SEQUENCE cache_test CACHE -1

statement error pgcode 22023 CACHE \(0\) must be greater than zero
CREATE SEQUENCE cache_test CACHE 0

statement error pgcode 0A000 CACHE values larger than 1 are not supported, found 5
CREATE SEQUENCE cache_test CACHE 5

statement error pgcode 0A000 CYCLE option is not supported
CREATE SEQUENCE cycle_test CYCLE

statement ok
CREATE SEQUENCE ignored_options_test CACHE 1 NO CYCLE
CREATE SEQUENCE ignored_options_test NO CYCLE

# Verify presence in crdb_internal.create_statements.

Expand All @@ -120,7 +111,7 @@ query ITTITTTTTTTB colnames
SELECT * FROM crdb_internal.create_statements WHERE descriptor_name = 'show_create_test'
----
database_id database_name schema_name descriptor_id descriptor_type descriptor_name create_statement state create_nofks alter_statements validate_statements has_partitions
52 test public 66 sequence show_create_test CREATE SEQUENCE public.show_create_test MINVALUE 1 MAXVALUE 9223372036854775807 INCREMENT 1 START 1 PUBLIC CREATE SEQUENCE public.show_create_test MINVALUE 1 MAXVALUE 9223372036854775807 INCREMENT 1 START 1 {} {} false
52 test public 63 sequence show_create_test CREATE SEQUENCE public.show_create_test MINVALUE 1 MAXVALUE 9223372036854775807 INCREMENT 1 START 1 PUBLIC CREATE SEQUENCE public.show_create_test MINVALUE 1 MAXVALUE 9223372036854775807 INCREMENT 1 START 1 {} {} false

query TT colnames
SHOW CREATE SEQUENCE show_create_test
Expand Down Expand Up @@ -1278,3 +1269,63 @@ ALTER SEQUENCE db2.seq OWNED BY db1.t.a

statement ok
CREATE SEQUENCE db2.seq2 OWNED BY db1.t.a

subtest cached_sequences

statement error pgcode 22023 CACHE \(-1\) must be greater than zero
CREATE SEQUENCE cache_test CACHE -1

statement error pgcode 22023 CACHE \(0\) must be greater than zero
CREATE SEQUENCE cache_test CACHE 0

statement ok
CREATE SEQUENCE cache_test CACHE 10 INCREMENT 1

# Verify cache invalidation with schema changes.

# 10 values (1,2,...,10) are cached, and the underlying sequence is increment to 10.
query I
SELECT nextval('cache_test')
----
1

query I
SELECT last_value FROM cache_test
----
10

statement ok
BEGIN

statement ok
ALTER SEQUENCE cache_test INCREMENT 5

# The cache is invalidated due to the above schema change, and 10 new values (15,20,...,60) are cached.
query I
SELECT nextval('cache_test')
----
15

# Rollback the schema change to use the old INCREMENT amount.
statement ok
ABORT

# The underlying sequence was still incremented despite the txn being aborted.
query I
SELECT last_value FROM cache_test
----
60

# 10 new values (61,62,...,70) are cached.
query I
SELECT nextval('cache_test')
----
61

query I
SELECT last_value FROM cache_test
----
70

statement ok
DROP SEQUENCE cache_test
10 changes: 10 additions & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,16 @@ func (p *planner) ExecCfg() *ExecutorConfig {
return p.extendedEvalCtx.ExecCfg
}

// GetOrInitSequenceCache returns the sequence cache for the session.
// If the sequence cache has not been used yet, it initializes the cache
// inside the session data.
func (p *planner) GetOrInitSequenceCache() sessiondata.SequenceCache {
if p.SessionData().SequenceCache == nil {
p.sessionDataMutator.initSequenceCache()
}
return p.SessionData().SequenceCache
}

func (p *planner) LeaseMgr() *lease.Manager {
return p.Descriptors().LeaseManager()
}
Expand Down
66 changes: 56 additions & 10 deletions pkg/sql/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,27 +87,72 @@ func (p *planner) IncrementSequence(ctx context.Context, seqName *tree.TableName
}

seqOpts := descriptor.GetSequenceOpts()

var val int64
if seqOpts.Virtual {
rowid := builtins.GenerateUniqueInt(p.EvalContext().NodeID.SQLInstanceID())
val = int64(rowid)
} else {
val, err = p.incrementSequenceUsingCache(ctx, descriptor)
}
if err != nil {
return 0, err
}

p.ExtendedEvalContext().SessionMutator.RecordLatestSequenceVal(uint32(descriptor.GetID()), val)

return val, nil
}

// incrementSequenceUsingCache fetches the next value of the sequence represented by the passed catalog.TableDescriptor.
// If the sequence has a cache size of greater than 1, then this function will read cached values from the session data
// and repopulate these values whenever the cache is empty.
func (p *planner) incrementSequenceUsingCache(
ctx context.Context, descriptor catalog.TableDescriptor,
) (int64, error) {
seqOpts := descriptor.GetSequenceOpts()

// A cache size of 1 means that there is no cache. Prior to #51259, sequence
// caching was unimplemented and cache sizes were left uninitialized (ie. to have a value of 0).
// If a sequence has a cache size of 0, it should be treated in the same was as sequences
// with cache sizes of 1.
cacheSize := seqOpts.CacheSize
if cacheSize == 0 {
cacheSize = 1
}

fetchNextValues := func() (int64, int64, int64, error) {
seqValueKey := p.ExecCfg().Codec.SequenceKey(uint32(descriptor.GetID()))
val, err = kv.IncrementValRetryable(
ctx, p.txn.DB(), seqValueKey, seqOpts.Increment)

endValue, err := kv.IncrementValRetryable(
ctx, p.txn.DB(), seqValueKey, seqOpts.Increment*cacheSize)

if err != nil {
if errors.HasType(err, (*roachpb.IntegerOverflowError)(nil)) {
return 0, boundsExceededError(descriptor)
return 0, 0, 0, boundsExceededError(descriptor)
}
return 0, err
return 0, 0, 0, err
}
if val > seqOpts.MaxValue || val < seqOpts.MinValue {
return 0, boundsExceededError(descriptor)
if endValue > seqOpts.MaxValue || endValue < seqOpts.MinValue {
return 0, 0, 0, boundsExceededError(descriptor)
}
}

p.ExtendedEvalContext().SessionMutator.RecordLatestSequenceVal(uint32(descriptor.GetID()), val)
return endValue - seqOpts.Increment*(cacheSize-1), seqOpts.Increment, cacheSize, nil
}

var val int64
var err error
if cacheSize == 1 {
val, _, _, err = fetchNextValues()
if err != nil {
return 0, err
}
} else {
val, err = p.GetOrInitSequenceCache().NextValue(uint32(descriptor.GetID()), uint32(descriptor.GetVersion()), fetchNextValues)
if err != nil {
return 0, err
}
}
return val, nil
}

Expand Down Expand Up @@ -263,6 +308,8 @@ func assignSequenceOptions(
opts.MaxValue = -1
opts.Start = opts.MaxValue
}
// No Caching
opts.CacheSize = 1
}

// Fill in all other options.
Expand Down Expand Up @@ -290,8 +337,7 @@ func assignSequenceOptions(
case v == 1:
// Do nothing; this is the default.
case v > 1:
return unimplemented.NewWithIssuef(32567,
"CACHE values larger than 1 are not supported, found %d", v)
opts.CacheSize = *option.IntVal
}
case tree.SeqOptIncrement:
// Do nothing; this has already been set.
Expand Down
Loading

0 comments on commit 850a09e

Please sign in to comment.