Skip to content

Commit

Permalink
Merge pull request #7904 from arjunravinarayan/replica_desc_refactor
Browse files Browse the repository at this point in the history
storage: prepend replica descriptors in panics
all logs and errors with it. Prepend panics as well.
storage: move rangeDesc out of replica.mu, use in Desc()
readable copy of the RangeDescriptor. replica.Desc() could use this as
well (so as to not lock replica.mu), but this change was
conservatively deferred. However, with the issue of replica.Desc()
acquiring locks being raised in #7888, and after discussion with
@bdarnell, it should be safe to refactor replica.Desc() as well.
  • Loading branch information
Arjun Narayan authored Jul 20, 2016
2 parents 4e391fb + 975ee58 commit ec00732
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 20 deletions.
46 changes: 27 additions & 19 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,18 @@ type Replica struct {
// RWMutex.
readOnlyCmdMu sync.RWMutex

// rangeDesc is a *RangeDescriptor that can be atomically read from in
// replica.Desc() without needing to acquire the replica.mu lock. All
// updates to state.Desc should be duplicated here
rangeDesc atomic.Value

mu struct {
// Protects all fields in the mu struct.
sync.Mutex
// Has the replica been destroyed.
destroyed error
// The state of the Raft state machine.
state storagebase.ReplicaState
// rangeDesc is a *RangeDescriptor that can be atomically read from. All
// updates to state.Desc should be duplicated here (as is done in
// updateRangeDescriptorLocked()) so that it can be read without acquiring
// the lock.
rangeDesc atomic.Value
// Counter used for assigning lease indexes for proposals.
lastAssignedLeaseIndex uint64
// Enforces at most one command is running per key(s).
Expand Down Expand Up @@ -363,7 +363,7 @@ func (r *Replica) newReplicaInner(desc *roachpb.RangeDescriptor, clock *hlc.Cloc
if r.mu.state, err = loadState(r.store.Engine(), desc); err != nil {
return err
}
r.mu.rangeDesc.Store(r.mu.state.Desc)
r.rangeDesc.Store(r.mu.state.Desc)

r.mu.lastIndex, err = loadLastIndex(r.store.Engine(), r.RangeID)
if err != nil {
Expand Down Expand Up @@ -392,7 +392,7 @@ func (r *Replica) newReplicaInner(desc *roachpb.RangeDescriptor, clock *hlc.Cloc

// String returns a string representation of the range.
func (r *Replica) String() string {
desc := r.mu.rangeDesc.Load().(*roachpb.RangeDescriptor)
desc := r.Desc()
return fmt.Sprintf("%s range=%d [%s-%s)", r.store,
desc.RangeID, desc.StartKey, desc.EndKey)
}
Expand Down Expand Up @@ -623,9 +623,7 @@ func (r *Replica) isInitializedLocked() bool {

// Desc returns the range's descriptor.
func (r *Replica) Desc() *roachpb.RangeDescriptor {
r.mu.Lock()
defer r.mu.Unlock()
return r.mu.state.Desc
return r.rangeDesc.Load().(*roachpb.RangeDescriptor)
}

// setDesc atomically sets the range's descriptor. This method calls
Expand Down Expand Up @@ -653,11 +651,12 @@ func (r *Replica) setDescWithoutProcessUpdate(desc *roachpb.RangeDescriptor) {
// setDescWithoutProcessUpdateLocked requires that the replica lock is held.
func (r *Replica) setDescWithoutProcessUpdateLocked(desc *roachpb.RangeDescriptor) {
if desc.RangeID != r.RangeID {
panic(fmt.Sprintf("range descriptor ID (%d) does not match replica's range ID (%d)",
desc.RangeID, r.RangeID))
r.panicf("range descriptor ID (%d) does not match replica's range ID (%d)",
desc.RangeID, r.RangeID)
}

r.mu.state.Desc = desc
r.mu.rangeDesc.Store(desc)
r.rangeDesc.Store(desc)
}

// GetReplicaDescriptor returns the replica for this range from the range
Expand Down Expand Up @@ -800,7 +799,7 @@ func (r *Replica) assertState(reader engine.Reader) {
func (r *Replica) assertStateLocked(reader engine.Reader) {
diskState, err := loadState(reader, r.mu.state.Desc)
if err != nil {
panic(err)
r.panic(err)
}
if !reflect.DeepEqual(diskState, r.mu.state) {
log.Fatalf("%s: on-disk and in-memory state diverged:\n%+v\n%+v", r, diskState, r.mu.state)
Expand Down Expand Up @@ -839,9 +838,9 @@ func (r *Replica) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.B
// empty batch; shouldn't happen (we could handle it, but it hints
// at someone doing weird things, and once we drop the key range
// from the header it won't be clear how to route those requests).
panic("empty batch")
r.panicf("empty batch")
} else {
panic(fmt.Sprintf("don't know how to handle command %s", ba))
r.panicf("don't know how to handle command %s", ba)
}
if _, ok := pErr.GetDetail().(*roachpb.RaftGroupDeletedError); ok {
// This error needs to be converted appropriately so that
Expand Down Expand Up @@ -1702,7 +1701,7 @@ func (r *Replica) sendRaftMessage(msg raftpb.Message) {
raftGroup.ReportUnreachable(msg.To)
return nil
}); err != nil {
panic(err)
r.panic(err)
}
}
}
Expand All @@ -1717,7 +1716,7 @@ func (r *Replica) reportSnapshotStatus(to uint64, snapErr error) {
raftGroup.ReportSnapshot(to, snapStatus)
return nil
}); err != nil {
panic(err)
r.panic(err)
}
}

Expand Down Expand Up @@ -2356,7 +2355,8 @@ func (r *Replica) executeBatch(
if cReply, ok := reply.(roachpb.Countable); ok {
retResults := cReply.Count()
if retResults > remScanResults {
panic(fmt.Sprintf("received %d results, limit was %d", retResults, remScanResults))
r.panicf("received %d results, limit was %d",
retResults, remScanResults)
}
remScanResults -= retResults
}
Expand Down Expand Up @@ -2642,3 +2642,11 @@ func (r *Replica) maybeAddToRaftLogQueue(appliedIndex uint64) {
r.store.raftLogQueue.MaybeAdd(r, r.store.Clock().Now())
}
}

func (r *Replica) panic(err error) {
panic(r.String() + ": " + err.Error())
}

func (r *Replica) panicf(format string, vals ...interface{}) {
panic(r.String() + ": " + fmt.Sprintf(format, vals...))
}
2 changes: 1 addition & 1 deletion storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func TestReplicaContains(t *testing.T) {
// This test really only needs a hollow shell of a Replica.
r := &Replica{}
r.mu.state.Desc = desc
r.mu.rangeDesc.Store(desc)
r.rangeDesc.Store(desc)

if statsKey := keys.RangeStatsKey(desc.RangeID); !r.ContainsKey(statsKey) {
t.Errorf("expected range to contain range stats key %q", statsKey)
Expand Down

0 comments on commit ec00732

Please sign in to comment.