Skip to content

Commit

Permalink
sqlinstance: make Start async, use in-memory copy of self to bootstrap
Browse files Browse the repository at this point in the history
This solves bullet 4 of cockroachdb#85737

Release note: None
  • Loading branch information
ajwerner committed Nov 29, 2022
1 parent 1a6e9f8 commit 099e1fe
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 56 deletions.
9 changes: 4 additions & 5 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ func (s *SQLServer) preStart(
return err
}
// Acquire our instance ID.
instanceID, err := s.sqlInstanceStorage.CreateInstance(
instance, err := s.sqlInstanceStorage.CreateInstance(
ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.distSQLServer.Locality)
if err != nil {
return err
Expand All @@ -1333,16 +1333,15 @@ func (s *SQLServer) preStart(
// Hand the instance ID to everybody who needs it, unless the sqlIDContainer
// has already been initialized with a node ID. IN that case we don't need to
// initialize a SQL instance ID in this case as this is not a SQL pod server.
if err := s.setInstanceID(ctx, instanceID, session.ID()); err != nil {
log.Infof(ctx, "bound sqlinstance: %v", instance)
if err := s.setInstanceID(ctx, instance.InstanceID, session.ID()); err != nil {
return err
}
// Start the instance provider. This needs to come after we've allocated our
// instance ID because the instances reader needs to see our own instance;
// we might be the only SQL server available, and if the reader doesn't see
// it, we'd be unable to plan any queries.
if err := s.sqlInstanceReader.Start(ctx); err != nil {
return err
}
s.sqlInstanceReader.Start(ctx, instance)
}

s.connManager = connManager
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ go_library(
"//pkg/roachpb",
"//pkg/sql/sqlliveness",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_cockroachdb_redact//interfaces",
],
)

Expand Down
43 changes: 27 additions & 16 deletions pkg/sql/sqlinstance/instancestorage/instancereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Reader struct {
initialScanDone chan struct{}
mu struct {
syncutil.Mutex
self sqlinstance.InstanceInfo
instances map[base.SQLInstanceID]instancerow
startError error
started bool
Expand Down Expand Up @@ -87,18 +88,30 @@ func NewReader(
}

// Start initializes the rangefeed for the Reader. The rangefeed will run until
// the stopper stops.
func (r *Reader) Start(ctx context.Context) error {
rf := r.maybeStartRangeFeed(ctx)
// the stopper stops. If self has a non-zero ID, it will be used to
// initialize the set of instances before the rangefeed catches up.
func (r *Reader) Start(ctx context.Context, self sqlinstance.InstanceInfo) {
if r.mu.started {
return
}
if self.InstanceID != 0 {
r.mu.instances[self.InstanceID] = instancerow{
region: self.Region,
instanceID: self.InstanceID,
addr: self.InstanceAddr,
sessionID: self.SessionID,
locality: self.Locality,
timestamp: hlc.Timestamp{}, // intentionally zero
}
}
r.startRangeFeed(ctx)
}

// WaitForStarted will block until the Reader has a full snapshot of all the
// instances.
func (r *Reader) WaitForStarted(ctx context.Context) error {
select {
case <-r.initialScanDone:
// TODO(rimadeodhar): Avoid blocking on initial
// scan until first call to read.
if rf != nil {
// Add rangefeed to the stopper to ensure it
// is shutdown correctly.
r.stopper.AddCloser(rf)
}
return r.checkStarted()
case <-r.stopper.ShouldQuiesce():
return errors.Wrap(stop.ErrUnavailable,
Expand All @@ -108,11 +121,8 @@ func (r *Reader) Start(ctx context.Context) error {
"failed to retrieve initial instance data")
}
}
func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed {
if r.started() {
// Nothing to do, return
return nil
}

func (r *Reader) startRangeFeed(ctx context.Context) *rangefeed.RangeFeed {
updateCacheFn := func(
ctx context.Context, keyVal *roachpb.RangeFeedValue,
) {
Expand Down Expand Up @@ -153,12 +163,13 @@ func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed {
rangefeed.WithOnInitialScanError(initialScanErrFn),
rangefeed.WithRowTimestampInInitialScan(true),
)
r.setStarted()
defer r.setStarted()
if err != nil {
r.setStartError(err)
close(r.initialScanDone)
return nil
}
r.stopper.AddCloser(rf)
return rf
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/sql/sqlinstance/instancestorage/instancereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func TestReader(t *testing.T) {

t.Run("basic-get-instance-data", func(t *testing.T) {
storage, slStorage, clock, reader := setup(t)
require.NoError(t, reader.Start(ctx))
reader.Start(ctx, sqlinstance.InstanceInfo{})
require.NoError(t, reader.WaitForStarted(ctx))
sessionID := makeSession()
const addr = "addr"
locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "test"}, {Key: "az", Value: "a"}}}
Expand All @@ -81,7 +82,7 @@ func TestReader(t *testing.T) {
const expiration = 10 * time.Minute
{
sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0)
id, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality)
instance, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality)
if err != nil {
t.Fatal(err)
}
Expand All @@ -90,7 +91,7 @@ func TestReader(t *testing.T) {
t.Fatal(err)
}
testutils.SucceedsSoon(t, func() error {
instanceInfo, err := reader.GetInstance(ctx, id)
instanceInfo, err := reader.GetInstance(ctx, instance.InstanceID)
if err != nil {
return err
}
Expand All @@ -109,7 +110,8 @@ func TestReader(t *testing.T) {
// live through the test.
const expiration = 10 * time.Minute
storage, slStorage, clock, reader := setup(t)
require.NoError(t, reader.Start(ctx))
reader.Start(ctx, sqlinstance.InstanceInfo{})
require.NoError(t, reader.WaitForStarted(ctx))

// Set up expected test data.
region := enum.One
Expand Down Expand Up @@ -205,7 +207,7 @@ func TestReader(t *testing.T) {
sessionID := makeSession()
locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region4"}}}
sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0)
id, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addresses[2], locality)
instance, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addresses[2], locality)
if err != nil {
t.Fatal(err)
}
Expand All @@ -219,7 +221,7 @@ func TestReader(t *testing.T) {
return err
}
sortInstances(instances)
return testOutputFn([]base.SQLInstanceID{id}, []string{addresses[2]}, []sqlliveness.SessionID{sessionID}, []roachpb.Locality{locality}, instances)
return testOutputFn([]base.SQLInstanceID{instance.InstanceID}, []string{addresses[2]}, []sqlliveness.SessionID{sessionID}, []roachpb.Locality{locality}, instances)
})
}
})
Expand All @@ -228,7 +230,8 @@ func TestReader(t *testing.T) {
// live through the test.
const expiration = 10 * time.Minute
storage, slStorage, clock, reader := setup(t)
require.NoError(t, reader.Start(ctx))
reader.Start(ctx, sqlinstance.InstanceInfo{})
require.NoError(t, reader.WaitForStarted(ctx))
// Create three instances and release one.
region := enum.One
instanceIDs := [...]base.SQLInstanceID{1, 2, 3}
Expand Down
21 changes: 14 additions & 7 deletions pkg/sql/sqlinstance/instancestorage/instancestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -139,17 +140,17 @@ func (s *Storage) CreateInstance(
sessionExpiration hlc.Timestamp,
addr string,
locality roachpb.Locality,
) (instanceID base.SQLInstanceID, _ error) {
) (instance sqlinstance.InstanceInfo, _ error) {
if len(addr) == 0 {
return base.SQLInstanceID(0), errors.New("no address information for instance")
return sqlinstance.InstanceInfo{}, errors.New("no address information for instance")
}
if len(sessionID) == 0 {
return base.SQLInstanceID(0), errors.New("no session information for instance")
return sqlinstance.InstanceInfo{}, errors.New("no session information for instance")
}

region, _, err := slstorage.UnsafeDecodeSessionID(sessionID)
if err != nil {
return base.SQLInstanceID(0), errors.Wrap(err, "unable to determine region for sql_instance")
return sqlinstance.InstanceInfo{}, errors.Wrap(err, "unable to determine region for sql_instance")
}

// TODO(jeffswenson): advance session expiration. This can get stuck in a
Expand Down Expand Up @@ -206,10 +207,16 @@ func (s *Storage) CreateInstance(
instanceID, err := assignInstance()
// Instance was successfully assigned an ID.
if err == nil {
return instanceID, err
return sqlinstance.InstanceInfo{
Region: region,
InstanceID: instanceID,
InstanceAddr: addr,
SessionID: sessionID,
Locality: locality,
}, nil
}
if !errors.Is(err, errNoPreallocatedRows) {
return base.SQLInstanceID(0), err
return sqlinstance.InstanceInfo{}, err
}
// If assignInstance failed because there are no available rows,
// allocate new instance IDs for the local region.
Expand All @@ -229,7 +236,7 @@ func (s *Storage) CreateInstance(
}

// If we exit here, it has to be the case where the context has expired.
return base.SQLInstanceID(0), ctx.Err()
return sqlinstance.InstanceInfo{}, ctx.Err()
}

// getAvailableInstanceIDForRegion retrieves an available instance ID for the
Expand Down
39 changes: 18 additions & 21 deletions pkg/sql/sqlinstance/instancestorage/instancestorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func TestStorage(t *testing.T) {
locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "test"}, {Key: "az", Value: "a"}}}
const expiration = time.Minute
{
instanceID, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr, locality)
instance, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr, locality)
require.NoError(t, err)
require.Equal(t, id, instanceID)
require.Equal(t, id, instance.InstanceID)
}
})

Expand All @@ -118,10 +118,10 @@ func TestStorage(t *testing.T) {
}
sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0)
for _, index := range []int{0, 1, 2} {
instanceID, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addresses[index], localities[index])
instance, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addresses[index], localities[index])
require.NoError(t, err)
require.NoError(t, slStorage.Insert(ctx, sessionIDs[index], sessionExpiry))
require.Equal(t, instanceIDs[index], instanceID)
require.Equal(t, instanceIDs[index], instance.InstanceID)
}

// Verify all instances are returned by GetAllInstancesDataForTest.
Expand All @@ -146,10 +146,10 @@ func TestStorage(t *testing.T) {

// Create two more instances.
for _, index := range []int{3, 4} {
instanceID, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addresses[index], localities[index])
instance, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addresses[index], localities[index])
require.NoError(t, err)
require.NoError(t, slStorage.Insert(ctx, sessionIDs[index], sessionExpiry))
require.Equal(t, instanceIDs[index], instanceID)
require.Equal(t, instanceIDs[index], instance.InstanceID)
}

// Verify all instances are returned by GetAllInstancesDataForTest.
Expand Down Expand Up @@ -188,11 +188,10 @@ func TestStorage(t *testing.T) {
{
require.NoError(t, slStorage.Delete(ctx, sessionIDs[4]))
var err error
var instanceID base.SQLInstanceID
require.NoError(t, slStorage.Insert(ctx, sessionID6, sessionExpiry))
instanceID, err = storage.CreateInstance(ctx, sessionID6, sessionExpiry, addr6, locality6)
instance, err := storage.CreateInstance(ctx, sessionID6, sessionExpiry, addr6, locality6)
require.NoError(t, err)
require.Equal(t, instanceIDs[4], instanceID)
require.Equal(t, instanceIDs[4], instance.InstanceID)
var instances []sqlinstance.InstanceInfo
instances, err = storage.GetAllInstancesDataForTest(ctx)
sortInstances(instances)
Expand All @@ -216,14 +215,12 @@ func TestStorage(t *testing.T) {

// Verify released instance ID gets reused.
{
var err error
var instanceID base.SQLInstanceID
newSessionID := makeSession()
newAddr := "addr7"
newLocality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region7"}}}
instanceID, err = storage.CreateInstance(ctx, newSessionID, sessionExpiry, newAddr, newLocality)
instance, err := storage.CreateInstance(ctx, newSessionID, sessionExpiry, newAddr, newLocality)
require.NoError(t, err)
require.Equal(t, instanceIDs[0], instanceID)
require.Equal(t, instanceIDs[0], instance.InstanceID)
var instances []sqlinstance.InstanceInfo
instances, err = storage.GetAllInstancesDataForTest(ctx)
sortInstances(instances)
Expand Down Expand Up @@ -288,7 +285,7 @@ func TestSQLAccess(t *testing.T) {
sessionID := makeSession()
var locality roachpb.Locality
require.NoError(t, locality.Set(tierStr))
instanceID, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr, locality)
instance, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr, locality)
require.NoError(t, err)

// Query the table through SQL and verify the query completes successfully.
Expand All @@ -304,7 +301,7 @@ func TestSQLAccess(t *testing.T) {
rows.Next()
err = rows.Scan(&parsedInstanceID, &parsedAddr, &parsedSessionID, &parsedLocality)
require.NoError(t, err)
require.Equal(t, instanceID, parsedInstanceID)
require.Equal(t, instance.InstanceID, parsedInstanceID)
require.Equal(t, sessionID, sqlliveness.SessionID(parsedSessionID.String))
require.Equal(t, addr, parsedAddr.String)
require.Equal(t, localityStr, parsedLocality.String)
Expand Down Expand Up @@ -381,17 +378,17 @@ func TestConcurrentCreateAndRelease(t *testing.T) {
if err != nil {
t.Fatal(err)
}
instanceID, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality)
instance, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality)
require.NoError(t, err)
if len(state.freeInstances) > 0 {
_, free := state.freeInstances[instanceID]
_, free := state.freeInstances[instance.InstanceID]
// Confirm that a free id was repurposed.
require.True(t, free)
delete(state.freeInstances, instanceID)
delete(state.freeInstances, instance.InstanceID)
}
state.liveInstances[instanceID] = struct{}{}
if instanceID > state.maxInstanceID {
state.maxInstanceID = instanceID
state.liveInstances[instance.InstanceID] = struct{}{}
if instance.InstanceID > state.maxInstanceID {
state.maxInstanceID = instance.InstanceID
}
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/sqlinstance/sqlinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ package sqlinstance

import (
"context"
"encoding/base64"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/cockroachdb/redact/interfaces"
)

// InstanceInfo exposes information on a SQL instance such as ID, network
Expand All @@ -34,6 +37,25 @@ type InstanceInfo struct {
Locality roachpb.Locality
}

// SafeFormat implements redact.SafeFormatter.
func (ii InstanceInfo) SafeFormat(s interfaces.SafePrinter, verb rune) {
s.Printf(
"Instance{RegionPrefix: %v, InstanceID: %d, Addr: %v, SessionID: %s, Locality: %v}",
redact.SafeString(base64.StdEncoding.EncodeToString(ii.Region)),
ii.InstanceID,
ii.InstanceAddr,
ii.SessionID,
ii.Locality,
)
}

// String implements fmt.Stringer.
func (ii InstanceInfo) String() string {
return redact.Sprint(ii).StripMarkers()
}

var _ redact.SafeFormatter = InstanceInfo{}

// AddressResolver exposes API for retrieving the instance address and all live instances for a tenant.
type AddressResolver interface {
// GetInstance returns the InstanceInfo for a pod given the instance ID.
Expand Down

0 comments on commit 099e1fe

Please sign in to comment.