Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Avoid adding all replicas at once in RelocateRange #29684

Merged
merged 2 commits into from
Sep 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions pkg/internal/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,15 @@ func (b *Batch) fillResults(ctx context.Context) {
reply, args)
}

// Nothing to do for all methods below as they do not generate
// any rows.
// Nothing to do for all methods below as they do not generate
// any rows.
case *roachpb.BeginTransactionRequest:
case *roachpb.EndTransactionRequest:
case *roachpb.AdminMergeRequest:
case *roachpb.AdminSplitRequest:
case *roachpb.AdminTransferLeaseRequest:
case *roachpb.AdminChangeReplicasRequest:
case *roachpb.AdminRelocateRangeRequest:
case *roachpb.HeartbeatTxnRequest:
case *roachpb.GCRequest:
case *roachpb.LeaseInfoRequest:
Expand Down Expand Up @@ -636,6 +637,24 @@ func (b *Batch) adminChangeReplicas(
b.initResult(1, 0, notRaw, nil)
}

// adminRelocateRange is only exported on DB. It is here for symmetry with the
// other operations.
func (b *Batch) adminRelocateRange(key interface{}, targets []roachpb.ReplicationTarget) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.AdminRelocateRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: k,
},
Targets: targets,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// writeBatch is only exported on DB.
func (b *Batch) writeBatch(s, e interface{}, data []byte) {
begin, err := marshalKey(s)
Expand Down
10 changes: 10 additions & 0 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,16 @@ func (db *DB) AdminChangeReplicas(
return getOneErr(db.Run(ctx, b), b)
}

// AdminRelocateRange relocates the replicas for a range onto the specified
// list of stores.
func (db *DB) AdminRelocateRange(
ctx context.Context, key interface{}, targets []roachpb.ReplicationTarget,
) error {
b := &Batch{}
b.adminRelocateRange(key, targets)
return getOneErr(db.Run(ctx, b), b)
}

// WriteBatch applies the operations encoded in a BatchRepr, which is the
// serialized form of a RocksDB Batch. The command cannot span Ranges and must
// be run on an empty keyrange.
Expand Down
10 changes: 10 additions & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,9 @@ func (*AdminTransferLeaseRequest) Method() Method { return AdminTransferLease }
// Method implements the Request interface.
func (*AdminChangeReplicasRequest) Method() Method { return AdminChangeReplicas }

// Method implements the Request interface.
func (*AdminRelocateRangeRequest) Method() Method { return AdminRelocateRange }

// Method implements the Request interface.
func (*HeartbeatTxnRequest) Method() Method { return HeartbeatTxn }

Expand Down Expand Up @@ -680,6 +683,12 @@ func (acrr *AdminChangeReplicasRequest) ShallowCopy() Request {
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (acrr *AdminRelocateRangeRequest) ShallowCopy() Request {
shallowCopy := *acrr
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (htr *HeartbeatTxnRequest) ShallowCopy() Request {
shallowCopy := *htr
Expand Down Expand Up @@ -1020,6 +1029,7 @@ func (*AdminSplitRequest) flags() int { return isAdmin | isAlone }
func (*AdminMergeRequest) flags() int { return isAdmin | isAlone }
func (*AdminTransferLeaseRequest) flags() int { return isAdmin | isAlone }
func (*AdminChangeReplicasRequest) flags() int { return isAdmin | isAlone }
func (*AdminRelocateRangeRequest) flags() int { return isAdmin | isAlone }
func (*HeartbeatTxnRequest) flags() int { return isWrite | isTxn }
func (*GCRequest) flags() int { return isWrite | isRange }
func (*PushTxnRequest) flags() int { return isWrite | isAlone }
Expand Down
2,140 changes: 1,328 additions & 812 deletions pkg/roachpb/api.pb.go

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,21 @@ message AdminChangeReplicasResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
}

// An AdminRelocateRangeRequest is the argument to the AdminRelocateRange()
// method. Relocates the replicas for a range to the specified target stores.
// The first store in the list of targets becomes the new leaseholder.
message AdminRelocateRangeRequest {
option (gogoproto.equal) = true;

RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
repeated ReplicationTarget targets = 2 [(gogoproto.nullable) = false];
// TODO(a-robinson): Add "reason"/"details" string fields?
}

message AdminRelocateRangeResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
}

// A HeartbeatTxnRequest is arguments to the HeartbeatTxn()
// method. It's sent by transaction coordinators to let the system
// know that the transaction is still ongoing. Note that this
Expand Down Expand Up @@ -1349,6 +1364,7 @@ message RequestUnion {
AdminMergeRequest admin_merge = 11;
AdminTransferLeaseRequest admin_transfer_lease = 29;
AdminChangeReplicasRequest admin_change_replicas = 35;
AdminRelocateRangeRequest admin_relocate_range = 45;
HeartbeatTxnRequest heartbeat_txn = 12;
GCRequest gc = 13;
PushTxnRequest push_txn = 14;
Expand Down Expand Up @@ -1397,6 +1413,7 @@ message ResponseUnion {
AdminMergeResponse admin_merge = 11;
AdminTransferLeaseResponse admin_transfer_lease = 29;
AdminChangeReplicasResponse admin_change_replicas = 35;
AdminRelocateRangeResponse admin_relocate_range = 45;
HeartbeatTxnResponse heartbeat_txn = 12;
GCResponse gc = 13;
PushTxnResponse push_txn = 14;
Expand Down
Loading