-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathraft_applied_index_term.go
100 lines (89 loc) · 3.87 KB
/
raft_applied_index_term.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package upgrades
import (
"bytes"
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/upgrade"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
// defaultPageSize controls how many ranges are paged in by default when
// iterating through all ranges in a cluster during any given upgrade. We
// pulled this number out of thin air(-ish). Let's consider a cluster with 50k
// ranges, with each range taking ~200ms. We're being somewhat conservative with
// the duration, but in a wide-area cluster with large hops between the manager
// and the replicas, it could be true. Here's how long it'll take for various
// block sizes:
//
// page size of 1 ~ 2h 46m
// page size of 50 ~ 3m 20s
// page size of 200 ~ 50s
const defaultPageSize = 200
func raftAppliedIndexTermMigration(
ctx context.Context, cv clusterversion.ClusterVersion, deps upgrade.SystemDeps, _ *jobs.Job,
) error {
var batchIdx, numMigratedRanges int
init := func() { batchIdx, numMigratedRanges = 1, 0 }
if err := deps.Cluster.IterateRangeDescriptors(ctx, defaultPageSize, init, func(descriptors ...roachpb.RangeDescriptor) error {
for _, desc := range descriptors {
// NB: This is a bit of a wart. We want to reach the first range,
// but we can't address the (local) StartKey. However, keys.LocalMax
// is on r1, so we'll just use that instead to target r1.
start, end := desc.StartKey, desc.EndKey
if bytes.Compare(desc.StartKey, keys.LocalMax) < 0 {
start, _ = keys.Addr(keys.LocalMax)
}
if err := deps.DB.Migrate(ctx, start, end, cv.Version); err != nil {
return err
}
}
// TODO(irfansharif): Instead of logging this to the debug log, we
// should insert these into a `system.upgrades` table for external
// observability.
numMigratedRanges += len(descriptors)
log.Infof(ctx, "[batch %d/??] migrated %d ranges", batchIdx, numMigratedRanges)
batchIdx++
return nil
}); err != nil {
return err
}
log.Infof(ctx, "[batch %d/%d] migrated %d ranges", batchIdx, batchIdx, numMigratedRanges)
// Make sure that all stores have synced. Given we're a below-raft
// upgrades, this ensures that the applied state is flushed to disk.
req := &serverpb.SyncAllEnginesRequest{}
op := "flush-stores"
return deps.Cluster.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error {
_, err := client.SyncAllEngines(ctx, req)
return err
})
}
func postRaftAppliedIndexTermMigration(
ctx context.Context, cv clusterversion.ClusterVersion, deps upgrade.SystemDeps, _ *jobs.Job,
) error {
// TODO(sumeer): this is copied from postTruncatedStateMigration. In
// comparison, postSeparatedIntentsMigration iterated over ranges and issues
// a noop below-raft upgrade. I am not clear on why there is a difference.
// Get this clarified.
// Purge all replicas that haven't been migrated to use the unreplicated
// truncated state and the range applied state.
truncStateVersion := clusterversion.ByKey(clusterversion.AddRaftAppliedIndexTermMigration)
req := &serverpb.PurgeOutdatedReplicasRequest{Version: &truncStateVersion}
op := fmt.Sprintf("purge-outdated-replicas=%s", req.Version)
return deps.Cluster.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error {
_, err := client.PurgeOutdatedReplicas(ctx, req)
return err
})
}