Skip to content

Commit

Permalink
storage: respect context deadline in idAllocator.Allocate
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
nvanbenschoten committed Dec 19, 2017
1 parent 5fe4455 commit 6adf047
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 18 deletions.
16 changes: 10 additions & 6 deletions pkg/storage/id_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,19 @@ func newIDAllocator(
}

// Allocate allocates a new ID from the global KV DB.
func (ia *idAllocator) Allocate() (uint32, error) {
func (ia *idAllocator) Allocate(ctx context.Context) (uint32, error) {
ia.once.Do(ia.start)

id := <-ia.ids
// when the channel is closed, the zero value is returned.
if id == 0 {
return id, errors.Errorf("could not allocate ID; system is draining")
select {
case id := <-ia.ids:
// when the channel is closed, the zero value is returned.
if id == 0 {
return id, errors.Errorf("could not allocate ID; system is draining")
}
return id, nil
case <-ctx.Done():
return 0, ctx.Err()
}
return id, nil
}

func (ia *idAllocator) start() {
Expand Down
24 changes: 17 additions & 7 deletions pkg/storage/id_alloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestIDAllocator(t *testing.T) {
for i := 0; i < maxI; i++ {
go func() {
for j := 0; j < maxJ; j++ {
id, err := idAlloc.Allocate()
id, err := idAlloc.Allocate(context.Background())
errChan <- err
allocd <- id
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestIDAllocatorNegativeValue(t *testing.T) {
if err != nil {
t.Errorf("failed to create IDAllocator: %v", err)
}
value, err := idAlloc.Allocate()
value, err := idAlloc.Allocate(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) {
t.Errorf("failed to create IDAllocator: %v", err)
}

firstID, err := idAlloc.Allocate()
firstID, err := idAlloc.Allocate(context.Background())
if err != nil {
t.Fatal(err)
}
Expand All @@ -172,7 +172,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) {
// Should be able to get the allocated IDs, and there will be one
// background allocateBlock to get ID continuously.
for i := 0; i < 8; i++ {
id, err := idAlloc.Allocate()
id, err := idAlloc.Allocate(context.Background())
if err != nil {
t.Fatal(err)
}
Expand All @@ -194,7 +194,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) {
errChan <- nil
}

id, err := idAlloc.Allocate()
id, err := idAlloc.Allocate(context.Background())
errChan <- err
allocd <- id
}()
Expand All @@ -207,6 +207,16 @@ func TestAllocateErrorAndRecovery(t *testing.T) {
}
}

// Attempt a few allocations with a context timeout while allocations are
// blocked. All attempts should hit a context deadline exceeded error.
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
for i := 0; i < routines; i++ {
id, err := idAlloc.Allocate(ctx)
if id != 0 || err != context.DeadlineExceeded {
t.Errorf("expected context cancellation, found id=%d, err=%v", id, err)
}
}

// Make the IDAllocator valid again.
idAlloc.idKey.Store(keys.RangeIDGenerator)
// Check if the blocked allocations return expected ID.
Expand All @@ -226,7 +236,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) {

// Check if the following allocations return expected ID.
for i := 0; i < routines; i++ {
id, err := idAlloc.Allocate()
id, err := idAlloc.Allocate(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -254,7 +264,7 @@ func TestAllocateWithStopper(t *testing.T) {
return idAlloc
}()

if _, err := idAlloc.Allocate(); !testutils.IsError(err, "system is draining") {
if _, err := idAlloc.Allocate(context.Background()); !testutils.IsError(err, "system is draining") {
t.Errorf("unexpected error: %v", err)
}
}
2 changes: 1 addition & 1 deletion pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2720,7 +2720,7 @@ func (r *Replica) adminSplitWithDescriptor(
log.Event(ctx, "found split key")

// Create right hand side range descriptor with the newly-allocated Range ID.
rightDesc, err := r.store.NewRangeDescriptor(splitKey, desc.EndKey, desc.Replicas)
rightDesc, err := r.store.NewRangeDescriptor(ctx, splitKey, desc.EndKey, desc.Replicas)
if err != nil {
return reply, true,
roachpb.NewErrorf("unable to allocate right hand side range descriptor: %s", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1778,9 +1778,9 @@ func (s *Store) IsDraining() bool {
// range ID and returns a RangeDescriptor whose Replicas are a copy
// of the supplied replicas slice, with appropriate ReplicaIDs assigned.
func (s *Store) NewRangeDescriptor(
start, end roachpb.RKey, replicas []roachpb.ReplicaDescriptor,
ctx context.Context, start, end roachpb.RKey, replicas []roachpb.ReplicaDescriptor,
) (*roachpb.RangeDescriptor, error) {
id, err := s.rangeIDAlloc.Allocate()
id, err := s.rangeIDAlloc.Allocate(ctx)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,8 @@ func splitTestRange(store *Store, key, splitKey roachpb.RKey, t *testing.T) *Rep
if repl == nil {
t.Fatalf("couldn't lookup range for key %q", key)
}
desc, err := store.NewRangeDescriptor(splitKey, repl.Desc().EndKey, repl.Desc().Replicas)
desc, err := store.NewRangeDescriptor(
context.Background(), splitKey, repl.Desc().EndKey, repl.Desc().Replicas)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1062,7 +1063,8 @@ func TestStoreRangeIDAllocation(t *testing.T) {
// to rangeIDAllocCount * 3 + 1.
for i := 0; i < rangeIDAllocCount*3; i++ {
replicas := []roachpb.ReplicaDescriptor{{StoreID: store.StoreID()}}
desc, err := store.NewRangeDescriptor(roachpb.RKey(fmt.Sprintf("%03d", i)), roachpb.RKey(fmt.Sprintf("%03d", i+1)), replicas)
desc, err := store.NewRangeDescriptor(context.Background(),
roachpb.RKey(fmt.Sprintf("%03d", i)), roachpb.RKey(fmt.Sprintf("%03d", i+1)), replicas)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 6adf047

Please sign in to comment.