Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqlinstance: make Start async, use in-memory copy of self to bootstrap #92644

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ func (s *SQLServer) preStart(
return err
}
// Acquire our instance ID.
instanceID, err := s.sqlInstanceStorage.CreateInstance(
instance, err := s.sqlInstanceStorage.CreateInstance(
ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.distSQLServer.Locality)
if err != nil {
return err
Expand All @@ -1333,16 +1333,15 @@ func (s *SQLServer) preStart(
// Hand the instance ID to everybody who needs it, unless the sqlIDContainer
// has already been initialized with a node ID. IN that case we don't need to
// initialize a SQL instance ID in this case as this is not a SQL pod server.
if err := s.setInstanceID(ctx, instanceID, session.ID()); err != nil {
log.Infof(ctx, "bound sqlinstance: %v", instance)
if err := s.setInstanceID(ctx, instance.InstanceID, session.ID()); err != nil {
return err
}
// Start the instance provider. This needs to come after we've allocated our
// instance ID because the instances reader needs to see our own instance;
// we might be the only SQL server available, and if the reader doesn't see
// it, we'd be unable to plan any queries.
if err := s.sqlInstanceReader.Start(ctx); err != nil {
return err
}
s.sqlInstanceReader.Start(ctx, instance)
}

s.connManager = connManager
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ go_library(
"//pkg/roachpb",
"//pkg/sql/sqlliveness",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_cockroachdb_redact//interfaces",
],
)

Expand Down
43 changes: 27 additions & 16 deletions pkg/sql/sqlinstance/instancestorage/instancereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Reader struct {
initialScanDone chan struct{}
mu struct {
syncutil.Mutex
self sqlinstance.InstanceInfo
instances map[base.SQLInstanceID]instancerow
startError error
started bool
Expand Down Expand Up @@ -87,18 +88,30 @@ func NewReader(
}

// Start initializes the rangefeed for the Reader. The rangefeed will run until
// the stopper stops.
func (r *Reader) Start(ctx context.Context) error {
rf := r.maybeStartRangeFeed(ctx)
// the stopper stops. If self has a non-zero ID, it will be used to
// initialize the set of instances before the rangefeed catches up.
func (r *Reader) Start(ctx context.Context, self sqlinstance.InstanceInfo) {
if r.mu.started {
return
}
if self.InstanceID != 0 {
r.mu.instances[self.InstanceID] = instancerow{
region: self.Region,
instanceID: self.InstanceID,
addr: self.InstanceAddr,
sessionID: self.SessionID,
locality: self.Locality,
timestamp: hlc.Timestamp{}, // intentionally zero
}
}
r.startRangeFeed(ctx)
}

// WaitForStarted will block until the Reader has a full snapshot of all the
// instances.
func (r *Reader) WaitForStarted(ctx context.Context) error {
select {
case <-r.initialScanDone:
// TODO(rimadeodhar): Avoid blocking on initial
// scan until first call to read.
if rf != nil {
// Add rangefeed to the stopper to ensure it
// is shutdown correctly.
r.stopper.AddCloser(rf)
}
return r.checkStarted()
case <-r.stopper.ShouldQuiesce():
return errors.Wrap(stop.ErrUnavailable,
Expand All @@ -108,11 +121,8 @@ func (r *Reader) Start(ctx context.Context) error {
"failed to retrieve initial instance data")
}
}
func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed {
if r.started() {
// Nothing to do, return
return nil
}

func (r *Reader) startRangeFeed(ctx context.Context) *rangefeed.RangeFeed {
updateCacheFn := func(
ctx context.Context, keyVal *roachpb.RangeFeedValue,
) {
Expand Down Expand Up @@ -153,12 +163,13 @@ func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed {
rangefeed.WithOnInitialScanError(initialScanErrFn),
rangefeed.WithRowTimestampInInitialScan(true),
)
r.setStarted()
defer r.setStarted()
if err != nil {
r.setStartError(err)
close(r.initialScanDone)
return nil
}
r.stopper.AddCloser(rf)
return rf
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/sql/sqlinstance/instancestorage/instancereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func TestReader(t *testing.T) {

t.Run("basic-get-instance-data", func(t *testing.T) {
storage, slStorage, clock, reader := setup(t)
require.NoError(t, reader.Start(ctx))
reader.Start(ctx, sqlinstance.InstanceInfo{})
require.NoError(t, reader.WaitForStarted(ctx))
sessionID := makeSession()
const addr = "addr"
locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "test"}, {Key: "az", Value: "a"}}}
Expand All @@ -81,7 +82,7 @@ func TestReader(t *testing.T) {
const expiration = 10 * time.Minute
{
sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0)
id, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality)
instance, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality)
if err != nil {
t.Fatal(err)
}
Expand All @@ -90,7 +91,7 @@ func TestReader(t *testing.T) {
t.Fatal(err)
}
testutils.SucceedsSoon(t, func() error {
instanceInfo, err := reader.GetInstance(ctx, id)
instanceInfo, err := reader.GetInstance(ctx, instance.InstanceID)
if err != nil {
return err
}
Expand All @@ -109,7 +110,8 @@ func TestReader(t *testing.T) {
// live through the test.
const expiration = 10 * time.Minute
storage, slStorage, clock, reader := setup(t)
require.NoError(t, reader.Start(ctx))
reader.Start(ctx, sqlinstance.InstanceInfo{})
require.NoError(t, reader.WaitForStarted(ctx))

// Set up expected test data.
region := enum.One
Expand Down Expand Up @@ -205,7 +207,7 @@ func TestReader(t *testing.T) {
sessionID := makeSession()
locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region4"}}}
sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0)
id, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addresses[2], locality)
instance, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addresses[2], locality)
if err != nil {
t.Fatal(err)
}
Expand All @@ -219,7 +221,7 @@ func TestReader(t *testing.T) {
return err
}
sortInstances(instances)
return testOutputFn([]base.SQLInstanceID{id}, []string{addresses[2]}, []sqlliveness.SessionID{sessionID}, []roachpb.Locality{locality}, instances)
return testOutputFn([]base.SQLInstanceID{instance.InstanceID}, []string{addresses[2]}, []sqlliveness.SessionID{sessionID}, []roachpb.Locality{locality}, instances)
})
}
})
Expand All @@ -228,7 +230,8 @@ func TestReader(t *testing.T) {
// live through the test.
const expiration = 10 * time.Minute
storage, slStorage, clock, reader := setup(t)
require.NoError(t, reader.Start(ctx))
reader.Start(ctx, sqlinstance.InstanceInfo{})
require.NoError(t, reader.WaitForStarted(ctx))
// Create three instances and release one.
region := enum.One
instanceIDs := [...]base.SQLInstanceID{1, 2, 3}
Expand Down
21 changes: 14 additions & 7 deletions pkg/sql/sqlinstance/instancestorage/instancestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -139,17 +140,17 @@ func (s *Storage) CreateInstance(
sessionExpiration hlc.Timestamp,
addr string,
locality roachpb.Locality,
) (instanceID base.SQLInstanceID, _ error) {
) (instance sqlinstance.InstanceInfo, _ error) {
if len(addr) == 0 {
return base.SQLInstanceID(0), errors.New("no address information for instance")
return sqlinstance.InstanceInfo{}, errors.New("no address information for instance")
}
if len(sessionID) == 0 {
return base.SQLInstanceID(0), errors.New("no session information for instance")
return sqlinstance.InstanceInfo{}, errors.New("no session information for instance")
}

region, _, err := slstorage.UnsafeDecodeSessionID(sessionID)
if err != nil {
return base.SQLInstanceID(0), errors.Wrap(err, "unable to determine region for sql_instance")
return sqlinstance.InstanceInfo{}, errors.Wrap(err, "unable to determine region for sql_instance")
}

// TODO(jeffswenson): advance session expiration. This can get stuck in a
Expand Down Expand Up @@ -206,10 +207,16 @@ func (s *Storage) CreateInstance(
instanceID, err := assignInstance()
// Instance was successfully assigned an ID.
if err == nil {
return instanceID, err
return sqlinstance.InstanceInfo{
Region: region,
InstanceID: instanceID,
InstanceAddr: addr,
SessionID: sessionID,
Locality: locality,
}, nil
}
if !errors.Is(err, errNoPreallocatedRows) {
return base.SQLInstanceID(0), err
return sqlinstance.InstanceInfo{}, err
}
// If assignInstance failed because there are no available rows,
// allocate new instance IDs for the local region.
Expand All @@ -229,7 +236,7 @@ func (s *Storage) CreateInstance(
}

// If we exit here, it has to be the case where the context has expired.
return base.SQLInstanceID(0), ctx.Err()
return sqlinstance.InstanceInfo{}, ctx.Err()
}

// getAvailableInstanceIDForRegion retrieves an available instance ID for the
Expand Down
39 changes: 18 additions & 21 deletions pkg/sql/sqlinstance/instancestorage/instancestorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func TestStorage(t *testing.T) {
locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "test"}, {Key: "az", Value: "a"}}}
const expiration = time.Minute
{
instanceID, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr, locality)
instance, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr, locality)
require.NoError(t, err)
require.Equal(t, id, instanceID)
require.Equal(t, id, instance.InstanceID)
}
})

Expand All @@ -118,10 +118,10 @@ func TestStorage(t *testing.T) {
}
sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0)
for _, index := range []int{0, 1, 2} {
instanceID, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addresses[index], localities[index])
instance, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addresses[index], localities[index])
require.NoError(t, err)
require.NoError(t, slStorage.Insert(ctx, sessionIDs[index], sessionExpiry))
require.Equal(t, instanceIDs[index], instanceID)
require.Equal(t, instanceIDs[index], instance.InstanceID)
}

// Verify all instances are returned by GetAllInstancesDataForTest.
Expand All @@ -146,10 +146,10 @@ func TestStorage(t *testing.T) {

// Create two more instances.
for _, index := range []int{3, 4} {
instanceID, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addresses[index], localities[index])
instance, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addresses[index], localities[index])
require.NoError(t, err)
require.NoError(t, slStorage.Insert(ctx, sessionIDs[index], sessionExpiry))
require.Equal(t, instanceIDs[index], instanceID)
require.Equal(t, instanceIDs[index], instance.InstanceID)
}

// Verify all instances are returned by GetAllInstancesDataForTest.
Expand Down Expand Up @@ -188,11 +188,10 @@ func TestStorage(t *testing.T) {
{
require.NoError(t, slStorage.Delete(ctx, sessionIDs[4]))
var err error
var instanceID base.SQLInstanceID
require.NoError(t, slStorage.Insert(ctx, sessionID6, sessionExpiry))
instanceID, err = storage.CreateInstance(ctx, sessionID6, sessionExpiry, addr6, locality6)
instance, err := storage.CreateInstance(ctx, sessionID6, sessionExpiry, addr6, locality6)
require.NoError(t, err)
require.Equal(t, instanceIDs[4], instanceID)
require.Equal(t, instanceIDs[4], instance.InstanceID)
var instances []sqlinstance.InstanceInfo
instances, err = storage.GetAllInstancesDataForTest(ctx)
sortInstances(instances)
Expand All @@ -216,14 +215,12 @@ func TestStorage(t *testing.T) {

// Verify released instance ID gets reused.
{
var err error
var instanceID base.SQLInstanceID
newSessionID := makeSession()
newAddr := "addr7"
newLocality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region7"}}}
instanceID, err = storage.CreateInstance(ctx, newSessionID, sessionExpiry, newAddr, newLocality)
instance, err := storage.CreateInstance(ctx, newSessionID, sessionExpiry, newAddr, newLocality)
require.NoError(t, err)
require.Equal(t, instanceIDs[0], instanceID)
require.Equal(t, instanceIDs[0], instance.InstanceID)
var instances []sqlinstance.InstanceInfo
instances, err = storage.GetAllInstancesDataForTest(ctx)
sortInstances(instances)
Expand Down Expand Up @@ -288,7 +285,7 @@ func TestSQLAccess(t *testing.T) {
sessionID := makeSession()
var locality roachpb.Locality
require.NoError(t, locality.Set(tierStr))
instanceID, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr, locality)
instance, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr, locality)
require.NoError(t, err)

// Query the table through SQL and verify the query completes successfully.
Expand All @@ -304,7 +301,7 @@ func TestSQLAccess(t *testing.T) {
rows.Next()
err = rows.Scan(&parsedInstanceID, &parsedAddr, &parsedSessionID, &parsedLocality)
require.NoError(t, err)
require.Equal(t, instanceID, parsedInstanceID)
require.Equal(t, instance.InstanceID, parsedInstanceID)
require.Equal(t, sessionID, sqlliveness.SessionID(parsedSessionID.String))
require.Equal(t, addr, parsedAddr.String)
require.Equal(t, localityStr, parsedLocality.String)
Expand Down Expand Up @@ -381,17 +378,17 @@ func TestConcurrentCreateAndRelease(t *testing.T) {
if err != nil {
t.Fatal(err)
}
instanceID, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality)
instance, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality)
require.NoError(t, err)
if len(state.freeInstances) > 0 {
_, free := state.freeInstances[instanceID]
_, free := state.freeInstances[instance.InstanceID]
// Confirm that a free id was repurposed.
require.True(t, free)
delete(state.freeInstances, instanceID)
delete(state.freeInstances, instance.InstanceID)
}
state.liveInstances[instanceID] = struct{}{}
if instanceID > state.maxInstanceID {
state.maxInstanceID = instanceID
state.liveInstances[instance.InstanceID] = struct{}{}
if instance.InstanceID > state.maxInstanceID {
state.maxInstanceID = instance.InstanceID
}
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/sqlinstance/sqlinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ package sqlinstance

import (
"context"
"encoding/base64"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/cockroachdb/redact/interfaces"
)

// InstanceInfo exposes information on a SQL instance such as ID, network
Expand All @@ -34,6 +37,25 @@ type InstanceInfo struct {
Locality roachpb.Locality
}

// SafeFormat implements redact.SafeFormatter.
func (ii InstanceInfo) SafeFormat(s interfaces.SafePrinter, verb rune) {
s.Printf(
"Instance{RegionPrefix: %v, InstanceID: %d, Addr: %v, SessionID: %s, Locality: %v}",
redact.SafeString(base64.StdEncoding.EncodeToString(ii.Region)),
ii.InstanceID,
ii.InstanceAddr,
ii.SessionID,
ii.Locality,
)
}

// String implements fmt.Stringer.
func (ii InstanceInfo) String() string {
return redact.Sprint(ii).StripMarkers()
}

var _ redact.SafeFormatter = InstanceInfo{}

// AddressResolver exposes API for retrieving the instance address and all live instances for a tenant.
type AddressResolver interface {
// GetInstance returns the InstanceInfo for a pod given the instance ID.
Expand Down