Skip to content

Commit

Permalink
sqlinstance: make Start async, use in-memory copy of self to bootstrap
Browse files Browse the repository at this point in the history
This solves bullet 4 of #85737. This commit makes sqlinstance.Start async, and
not block until the rangefeed gets started.

Epic: CRDB-18596

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
jaylim-crl and ajwerner committed Nov 29, 2022
1 parent 8e4d3df commit e90c549
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 114 deletions.
14 changes: 7 additions & 7 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1314,8 +1314,8 @@ func (s *SQLServer) preStart(
); err != nil {
return err
}
// Acquire our instance ID.
instanceID, err := s.sqlInstanceStorage.CreateInstance(
// Acquire our instance row.
instance, err := s.sqlInstanceStorage.CreateInstance(
ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.distSQLServer.Locality)
if err != nil {
return err
Expand All @@ -1329,16 +1329,16 @@ func (s *SQLServer) preStart(
// Hand the instance ID to everybody who needs it, unless the sqlIDContainer
// has already been initialized with a node ID. IN that case we don't need to
// initialize a SQL instance ID in this case as this is not a SQL pod server.
if err := s.setInstanceID(ctx, instanceID, session.ID()); err != nil {
log.Infof(ctx, "bound sqlinstance: %v", instance)
if err := s.setInstanceID(ctx, instance.InstanceID, session.ID()); err != nil {
return err
}
// Start the instance provider. This needs to come after we've allocated our
// instance ID because the instances reader needs to see our own instance;
// we might be the only SQL server available, and if the reader doesn't see
// we might be the only SQL server available, especially when we have not
// received data from the rangefeed yet, and if the reader doesn't see
// it, we'd be unable to plan any queries.
if err := s.sqlInstanceReader.Start(ctx); err != nil {
return err
}
s.sqlInstanceReader.Start(ctx, instance)
}

s.pgL = pgL
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ go_library(
"//pkg/roachpb",
"//pkg/sql/sqlliveness",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_cockroachdb_redact//interfaces",
],
)

Expand Down
106 changes: 49 additions & 57 deletions pkg/sql/sqlinstance/instancestorage/instancereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,20 @@ import (
// Reader implements the sqlinstance.AddressResolver interface. It uses
// caching backed by rangefeed to cache instance information.
type Reader struct {
storage *Storage
slReader sqlliveness.Reader
f *rangefeed.Factory
codec keys.SQLCodec
clock *hlc.Clock
stopper *stop.Stopper
rowcodec rowCodec
storage *Storage
slReader sqlliveness.Reader
f *rangefeed.Factory
codec keys.SQLCodec
clock *hlc.Clock
stopper *stop.Stopper
rowcodec rowCodec
// Once initialScanDone is closed, the error (if any) while establishing the
// rangefeed can be found in initialScanErr.
initialScanDone chan struct{}
mu struct {
syncutil.Mutex
instances map[base.SQLInstanceID]instancerow
startError error
started bool
instances map[base.SQLInstanceID]instancerow
initialScanErr error
}
}

Expand Down Expand Up @@ -87,19 +88,29 @@ func NewReader(
}

// Start initializes the rangefeed for the Reader. The rangefeed will run until
// the stopper stops.
func (r *Reader) Start(ctx context.Context) error {
rf := r.maybeStartRangeFeed(ctx)
// the stopper stops. If self has a non-zero ID, it will be used to initialize
// the set of instances before the rangefeed catches up.
func (r *Reader) Start(ctx context.Context, self sqlinstance.InstanceInfo) {
if self.InstanceID != 0 {
r.updateInstanceMap(instancerow{
region: self.Region,
instanceID: self.InstanceID,
addr: self.InstanceAddr,
sessionID: self.SessionID,
locality: self.Locality,
timestamp: hlc.Timestamp{}, // intentionally zero
}, false /* deletionEvent */)
}
r.startRangeFeed(ctx)
}

// WaitForStarted will block until the Reader has an initial full snapshot of
// all the instances. If Start hasn't been called, this will block until the
// context is cancelled, or the stopper quiesces.
func (r *Reader) WaitForStarted(ctx context.Context) error {
select {
case <-r.initialScanDone:
// TODO(rimadeodhar): Avoid blocking on initial
// scan until first call to read.
if rf != nil {
// Add rangefeed to the stopper to ensure it
// is shutdown correctly.
r.stopper.AddCloser(rf)
}
return r.checkStarted()
return r.initialScanErr()
case <-r.stopper.ShouldQuiesce():
return errors.Wrap(stop.ErrUnavailable,
"failed to retrieve initial instance data")
Expand All @@ -108,11 +119,8 @@ func (r *Reader) Start(ctx context.Context) error {
"failed to retrieve initial instance data")
}
}
func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed {
if r.started() {
// Nothing to do, return
return nil
}

func (r *Reader) startRangeFeed(ctx context.Context) {
updateCacheFn := func(
ctx context.Context, keyVal *roachpb.RangeFeedValue,
) {
Expand All @@ -124,16 +132,15 @@ func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed {
r.updateInstanceMap(instance, !keyVal.Value.IsPresent())
}
initialScanDoneFn := func(_ context.Context) {
close(r.initialScanDone)
r.setInitialScanErr(nil)
}
initialScanErrFn := func(_ context.Context, err error) (shouldFail bool) {
if grpcutil.IsAuthError(err) ||
// This is a hack around the fact that we do not get properly structured
// errors out of gRPC. See #56208.
strings.Contains(err.Error(), "rpc error: code = Unauthenticated") {
shouldFail = true
r.setStartError(err)
close(r.initialScanDone)
r.setInitialScanErr(err)
}
return shouldFail
}
Expand All @@ -153,20 +160,18 @@ func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed {
rangefeed.WithOnInitialScanError(initialScanErrFn),
rangefeed.WithRowTimestampInInitialScan(true),
)
r.setStarted()
if err != nil {
r.setStartError(err)
close(r.initialScanDone)
return nil
r.setInitialScanErr(err)
return
}
return rf
r.stopper.AddCloser(rf)
}

// GetInstance implements sqlinstance.AddressResolver interface.
func (r *Reader) GetInstance(
ctx context.Context, instanceID base.SQLInstanceID,
) (sqlinstance.InstanceInfo, error) {
if err := r.checkStarted(); err != nil {
if err := r.initialScanErr(); err != nil {
return sqlinstance.InstanceInfo{}, err
}
r.mu.Lock()
Expand Down Expand Up @@ -198,7 +203,7 @@ func (r *Reader) GetInstance(
func (r *Reader) GetAllInstances(
ctx context.Context,
) (sqlInstances []sqlinstance.InstanceInfo, _ error) {
if err := r.checkStarted(); err != nil {
if err := r.initialScanErr(); err != nil {
return nil, err
}
liveInstances, err := r.getAllLiveInstances(ctx)
Expand Down Expand Up @@ -256,8 +261,8 @@ func (r *Reader) getAllLiveInstances(ctx context.Context) ([]instancerow, error)
return rows, nil
}

// getAllInstanceRows returns all instancerow objects contained
// within the map, in an arbitrary order.
// getAllInstanceRows returns all instancerow objects contained within the map,
// in an arbitrary order.
func (r *Reader) getAllInstanceRows() (instances []instancerow) {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -277,29 +282,16 @@ func (r *Reader) updateInstanceMap(instance instancerow, deletionEvent bool) {
r.mu.instances[instance.instanceID] = instance
}

func (r *Reader) setStarted() {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.started = true
}

func (r *Reader) started() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.mu.started
}

func (r *Reader) checkStarted() error {
func (r *Reader) initialScanErr() error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.mu.started {
return sqlinstance.NotStartedError
}
return r.mu.startError
return r.mu.initialScanErr
}

func (r *Reader) setStartError(err error) {
func (r *Reader) setInitialScanErr(err error) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.startError = err
// Set error before closing done channel.
r.mu.initialScanErr = err
close(r.initialScanDone)
}
45 changes: 37 additions & 8 deletions pkg/sql/sqlinstance/instancestorage/instancereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,33 @@ func TestReader(t *testing.T) {
t.Run("unstarted-reader", func(t *testing.T) {
_, _, _, reader := setup(t)
_, err := reader.GetInstance(ctx, 1)
require.ErrorIs(t, err, sqlinstance.NotStartedError)
require.ErrorIs(t, err, sqlinstance.NonExistentInstanceError)
})
t.Run("read-without-waiting", func(t *testing.T) {
storage, slStorage, clock, reader := setup(t)
sessionID := makeSession()
const addr = "addr"
locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "test"}, {Key: "az", Value: "a"}}}
// Set a high enough expiration to ensure the session stays
// live through the test.
const expiration = 10 * time.Minute
sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0)
instance, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality)
require.NoError(t, err)
err = slStorage.Insert(ctx, sessionID, sessionExpiry)
require.NoError(t, err)
reader.Start(ctx, instance)

// Attempt to get instance without waiting.
instanceInfo, err := reader.GetInstance(ctx, instance.InstanceID)
require.NoError(t, err)
require.Equal(t, addr, instanceInfo.InstanceAddr)
require.Equal(t, locality, instanceInfo.Locality)
})
t.Run("basic-get-instance-data", func(t *testing.T) {
storage, slStorage, clock, reader := setup(t)
require.NoError(t, reader.Start(ctx))
reader.Start(ctx, sqlinstance.InstanceInfo{})
require.NoError(t, reader.WaitForStarted(ctx))
sessionID := makeSession()
const addr = "addr"
locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "test"}, {Key: "az", Value: "a"}}}
Expand All @@ -81,7 +102,7 @@ func TestReader(t *testing.T) {
const expiration = 10 * time.Minute
{
sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0)
id, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality)
instance, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addr, locality)
if err != nil {
t.Fatal(err)
}
Expand All @@ -90,7 +111,7 @@ func TestReader(t *testing.T) {
t.Fatal(err)
}
testutils.SucceedsSoon(t, func() error {
instanceInfo, err := reader.GetInstance(ctx, id)
instanceInfo, err := reader.GetInstance(ctx, instance.InstanceID)
if err != nil {
return err
}
Expand All @@ -109,7 +130,8 @@ func TestReader(t *testing.T) {
// live through the test.
const expiration = 10 * time.Minute
storage, slStorage, clock, reader := setup(t)
require.NoError(t, reader.Start(ctx))
reader.Start(ctx, sqlinstance.InstanceInfo{})
require.NoError(t, reader.WaitForStarted(ctx))

// Set up expected test data.
region := enum.One
Expand Down Expand Up @@ -205,7 +227,7 @@ func TestReader(t *testing.T) {
sessionID := makeSession()
locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region4"}}}
sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0)
id, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addresses[2], locality)
instance, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addresses[2], locality)
if err != nil {
t.Fatal(err)
}
Expand All @@ -219,7 +241,13 @@ func TestReader(t *testing.T) {
return err
}
sortInstances(instances)
return testOutputFn([]base.SQLInstanceID{id}, []string{addresses[2]}, []sqlliveness.SessionID{sessionID}, []roachpb.Locality{locality}, instances)
return testOutputFn(
[]base.SQLInstanceID{instance.InstanceID},
[]string{addresses[2]},
[]sqlliveness.SessionID{sessionID},
[]roachpb.Locality{locality},
instances,
)
})
}
})
Expand All @@ -228,7 +256,8 @@ func TestReader(t *testing.T) {
// live through the test.
const expiration = 10 * time.Minute
storage, slStorage, clock, reader := setup(t)
require.NoError(t, reader.Start(ctx))
reader.Start(ctx, sqlinstance.InstanceInfo{})
require.NoError(t, reader.WaitForStarted(ctx))
// Create three instances and release one.
region := enum.One
instanceIDs := [...]base.SQLInstanceID{1, 2, 3}
Expand Down
21 changes: 14 additions & 7 deletions pkg/sql/sqlinstance/instancestorage/instancestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -139,17 +140,17 @@ func (s *Storage) CreateInstance(
sessionExpiration hlc.Timestamp,
addr string,
locality roachpb.Locality,
) (instanceID base.SQLInstanceID, _ error) {
) (instance sqlinstance.InstanceInfo, _ error) {
if len(addr) == 0 {
return base.SQLInstanceID(0), errors.New("no address information for instance")
return sqlinstance.InstanceInfo{}, errors.New("no address information for instance")
}
if len(sessionID) == 0 {
return base.SQLInstanceID(0), errors.New("no session information for instance")
return sqlinstance.InstanceInfo{}, errors.New("no session information for instance")
}

region, _, err := slstorage.UnsafeDecodeSessionID(sessionID)
if err != nil {
return base.SQLInstanceID(0), errors.Wrap(err, "unable to determine region for sql_instance")
return sqlinstance.InstanceInfo{}, errors.Wrap(err, "unable to determine region for sql_instance")
}

// TODO(jeffswenson): advance session expiration. This can get stuck in a
Expand Down Expand Up @@ -206,10 +207,16 @@ func (s *Storage) CreateInstance(
instanceID, err := assignInstance()
// Instance was successfully assigned an ID.
if err == nil {
return instanceID, err
return sqlinstance.InstanceInfo{
Region: region,
InstanceID: instanceID,
InstanceAddr: addr,
SessionID: sessionID,
Locality: locality,
}, err
}
if !errors.Is(err, errNoPreallocatedRows) {
return base.SQLInstanceID(0), err
return sqlinstance.InstanceInfo{}, err
}
// If assignInstance failed because there are no available rows,
// allocate new instance IDs for the local region.
Expand All @@ -229,7 +236,7 @@ func (s *Storage) CreateInstance(
}

// If we exit here, it has to be the case where the context has expired.
return base.SQLInstanceID(0), ctx.Err()
return sqlinstance.InstanceInfo{}, ctx.Err()
}

// getAvailableInstanceIDForRegion retrieves an available instance ID for the
Expand Down
Loading

0 comments on commit e90c549

Please sign in to comment.