Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migration: introduce primitives for below-raft migrations #58088

Merged
merged 1 commit into from
Dec 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-10</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-16</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
28 changes: 28 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,22 @@ const (
VirtualComputedColumns
// CPutInline is conditional put support for inline values.
CPutInline
// ReplicaVersions enables the versioning of Replica state.
ReplicaVersions
// TruncatedAndRangeAppliedStateMigration is part of the migration to stop
// using the legacy truncated state within KV. After the migration, we'll be
// using the unreplicated truncated state and the RangeAppliedState on all
// ranges. Callers that wish to assert on there no longer being any legacy
// will be able to do so after PostTruncatedAndRangeAppliedStateMigration is
// active. This lets remove any holdover code handling the possibility of
// replicated truncated state in 21.2.
//
// TODO(irfansharif): Do the above in 21.2.
TruncatedAndRangeAppliedStateMigration
// PostTruncatedAndRangeAppliedStateMigration is used to purge all replicas
// using the replicated legacy TruncatedState. It's also used in asserting
// that no replicated truncated state representation is found.
PostTruncatedAndRangeAppliedStateMigration

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -340,6 +356,18 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: CPutInline,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 10},
},
{
Key: ReplicaVersions,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 12},
},
{
Key: TruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 14},
},
{
Key: PostTruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 16},
},

// Step (2): Add new versions here.
})
Expand Down
7 changes: 5 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ var (
// LocalLeaseAppliedIndexLegacySuffix is the suffix for the applied lease
// index.
LocalLeaseAppliedIndexLegacySuffix = []byte("rlla")
// LocalRangeVersionSuffix is the suffix for the range version.
LocalRangeVersionSuffix = []byte("rver")
// LocalRangeStatsLegacySuffix is the suffix for range statistics.
LocalRangeStatsLegacySuffix = []byte("stat")
// localTxnSpanGCThresholdSuffix is DEPRECATED and remains to prevent reuse.
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ var _ = [...]interface{}{
RaftTruncatedStateLegacyKey, // "rftt"
RangeLeaseKey, // "rll-"
LeaseAppliedIndexLegacyKey, // "rlla"
RangeVersionKey, // "rver"
RangeStatsLegacyKey, // "stat"

// 2. Unreplicated range-ID local keys: These contain metadata that
Expand Down
10 changes: 10 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ func RangeLastGCKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangeLastGCKey()
}

// RangeVersionKey returns a system-local for the range version.
func RangeVersionKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangeVersionKey()
}

// MakeRangeIDUnreplicatedPrefix creates a range-local key prefix from
// rangeID for all unreplicated data.
func MakeRangeIDUnreplicatedPrefix(rangeID roachpb.RangeID) roachpb.Key {
Expand Down Expand Up @@ -964,6 +969,11 @@ func (b RangeIDPrefixBuf) RangeLastGCKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRangeLastGCSuffix...)
}

// RangeVersionKey returns a system-local key for the range version.
func (b RangeIDPrefixBuf) RangeVersionKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRangeVersionSuffix...)
}

// RangeTombstoneKey returns a system-local key for a range tombstone.
func (b RangeIDPrefixBuf) RangeTombstoneKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRangeTombstoneSuffix...)
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ var (
{name: "RangeLease", suffix: LocalRangeLeaseSuffix},
{name: "RangeStats", suffix: LocalRangeStatsLegacySuffix},
{name: "RangeLastGC", suffix: LocalRangeLastGCSuffix},
{name: "RangeVersion", suffix: LocalRangeVersionSuffix},
}

rangeSuffixDict = []struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestPrettyPrint(t *testing.T) {
{keys.RangeLeaseKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLease", revertSupportUnknown},
{keys.RangeStatsLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeStats", revertSupportUnknown},
{keys.RangeLastGCKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLastGC", revertSupportUnknown},
{keys.RangeVersionKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeVersion", revertSupportUnknown},

{keys.RaftHardStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RaftHardState", revertSupportUnknown},
{keys.RangeTombstoneKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RangeTombstone", revertSupportUnknown},
Expand Down
37 changes: 29 additions & 8 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,10 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.DeleteRequest:
row := &result.Rows[k]
row.Key = []byte(args.(*roachpb.DeleteRequest).Key)

case *roachpb.DeleteRangeRequest:
if result.Err == nil {
result.Keys = reply.(*roachpb.DeleteRangeResponse).Keys
}

default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
reply, args)
}

// Nothing to do for all methods below as they do not generate
// any rows.
case *roachpb.EndTxnRequest:
Expand All @@ -265,6 +257,12 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.ImportRequest:
case *roachpb.AdminScatterRequest:
case *roachpb.AddSSTableRequest:
case *roachpb.MigrateRequest:
default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
reply, args)
}
}
// Fill up the resume span.
if result.Err == nil && reply != nil && reply.Header().ResumeSpan != nil {
Expand Down Expand Up @@ -786,3 +784,26 @@ func (b *Batch) addSSTable(
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// migrate is only exported on DB.
func (b *Batch) migrate(s, e interface{}, version roachpb.Version) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
end, err := marshalKey(e)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.MigrateRequest{
RequestHeader: roachpb.RequestHeader{
Key: begin,
EndKey: end,
},
Version: version,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}
10 changes: 10 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,16 @@ func (db *DB) AddSSTable(
return getOneErr(db.Run(ctx, b), b)
}

// Migrate is used instruct all ranges overlapping with the provided keyspace to
// exercise any relevant (below-raft) migrations in order for its range state to
// conform to what's needed by the specified version. It's a core primitive used
// in our migrations infrastructure to phase out legacy code below raft.
func (db *DB) Migrate(ctx context.Context, begin, end interface{}, version roachpb.Version) error {
b := &Batch{}
b.migrate(begin, end, version)
return getOneErr(db.Run(ctx, b), b)
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ go_test(
"client_lease_test.go",
"client_merge_test.go",
"client_metrics_test.go",
"client_migration_test.go",
"client_protectedts_test.go",
"client_raft_helpers_test.go",
"client_raft_log_queue_test.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"cmd_lease_request.go",
"cmd_lease_transfer.go",
"cmd_merge.go",
"cmd_migrate.go",
"cmd_push_txn.go",
"cmd_put.go",
"cmd_query_intent.go",
Expand Down
17 changes: 11 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ func splitTriggerHelper(
// initial state. Additionally, since bothDeltaMS is tracking writes to
// both sides, we need to update it as well.
{
// Various pieces of code rely on a replica's lease never being unitialized,
// Various pieces of code rely on a replica's lease never being uninitialized,
// but it's more than that - it ensures that we properly initialize the
// timestamp cache, which is only populated on the lease holder, from that
// of the original Range. We found out about a regression here the hard way
Expand All @@ -918,8 +918,9 @@ func splitTriggerHelper(
// - node two can illegally propose a write to 'd' at a lower timestamp.
//
// TODO(tschottdorf): why would this use r.store.Engine() and not the
// batch?
leftLease, err := MakeStateLoader(rec).LoadLease(ctx, rec.Engine())
// batch? We do the same thing for other usages of the state loader.
sl := MakeStateLoader(rec)
leftLease, err := sl.LoadLease(ctx, rec.Engine())
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load lease")
}
Expand All @@ -936,8 +937,7 @@ func splitTriggerHelper(
}
rightLease := leftLease
rightLease.Replica = replica

gcThreshold, err := MakeStateLoader(rec).LoadGCThreshold(ctx, rec.Engine())
gcThreshold, err := sl.LoadGCThreshold(ctx, rec.Engine())
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load GCThreshold")
}
Expand Down Expand Up @@ -968,6 +968,11 @@ func splitTriggerHelper(
truncStateType = stateloader.TruncatedStateLegacyReplicated
}

replicaVersion, err := sl.LoadVersion(ctx, rec.Engine())
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load GCThreshold")
}

// Writing the initial state is subtle since this also seeds the Raft
// group. It becomes more subtle due to proposer-evaluated Raft.
//
Expand Down Expand Up @@ -1000,7 +1005,7 @@ func splitTriggerHelper(

*h.AbsPostSplitRight(), err = stateloader.WriteInitialReplicaState(
ctx, batch, *h.AbsPostSplitRight(), split.RightDesc, rightLease,
*gcThreshold, truncStateType,
*gcThreshold, truncStateType, replicaVersion,
)
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state")
Expand Down
Loading