Skip to content

Commit

Permalink
server,migration: generate EveryNode req/resp helpers
Browse files Browse the repository at this point in the history
We expect to add multiple req/resp types as individual operations for
the EveryNode RPC. Each of these operations will correspond to a
"primitive" of sorts for the (long running) migrations infrastructure.
It's a bit cumbersome to wield this nested union type, so we
autogenerate helper code to do it for us. We take precedence from
api.proto and all the very many batch request/responses.

Release note: None
  • Loading branch information
irfansharif committed Oct 29, 2020
1 parent 9ccb3f9 commit c260c85
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 51 deletions.
1 change: 1 addition & 0 deletions pkg/migration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ go_library(
"//pkg/util/log",
"//vendor/github.com/cockroachdb/errors",
"//vendor/github.com/cockroachdb/logtags",
"//vendor/github.com/cockroachdb/redact",
],
)
30 changes: 11 additions & 19 deletions pkg/migration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

// Migration defines a program to be executed once every node in the cluster is
Expand Down Expand Up @@ -118,27 +119,26 @@ func (h *Helper) Log(event string) error {
return nil
}

// EveryNode lets migrations execute the given EveryNodeRequest across every
// node in the cluster.
func (h *Helper) EveryNode(ctx context.Context, req EveryNodeRequest) error {
// EveryNode lets migrations execute the given EveryNodeOp across every node in
// the cluster.
func (h *Helper) EveryNode(ctx context.Context, op serverpb.EveryNodeOp) error {
nodeIDs, err := h.RequiredNodes(ctx)
if err != nil {
return err
}

// TODO(irfansharif): We can/should send out these RPCs in parallel.
log.Infof(ctx, "executing req=%s on nodes=%s", req, nodeIDs)
log.Infof(ctx, "executing op=%s on nodes=%s", redact.Safe(op.Op()), redact.Safe(nodeIDs))
for _, nodeID := range nodeIDs {
conn, err := h.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return err
}

// TODO(irfansharif): This typecasting below is busted. We'll need
// wrapping/unwrapping code around the EveryNodeRequest internal union
// type.
req := &serverpb.EveryNodeRequest{}
req.Request.SetInner(op)
client := serverpb.NewAdminClient(conn)
if _, err := client.EveryNode(ctx, req.(*serverpb.EveryNodeRequest)); err != nil {
if _, err := client.EveryNode(ctx, req); err != nil {
return err
}
}
Expand All @@ -149,19 +149,12 @@ func (h *Helper) EveryNode(ctx context.Context, req EveryNodeRequest) error {
return nil
}

// EveryNodeRequest is an interface only satisfied by valid request types to the
// EveryNode RPC.
//
// TODO(irfansharif): Make this so. Should probably be defined in
// pkg/server/serverpb.
type EveryNodeRequest interface{}

// MigrateTo runs the set of migrations required to upgrade the cluster version
// to the provided target version.
func (m *Manager) MigrateTo(ctx context.Context, targetV roachpb.Version) error {
// TODO(irfansharif): Should we inject every ctx here with specific labels
// for each migration, so they log distinctly? Do we need an AmbientContext?
ctx = logtags.AddTag(ctx, "migration-mgr", nil)
ctx = logtags.AddTag(ctx, "migration-mgr", redact.Safe(targetV))

// TODO(irfansharif): We'll need to acquire a lease here and refresh it
// throughout during the migration to ensure mutual exclusion.
Expand All @@ -178,7 +171,6 @@ func (m *Manager) MigrateTo(ctx context.Context, targetV roachpb.Version) error
// TODO(irfansharif): After determining the last completed migration, if
// any, we'll be want to assemble the list of remaining migrations to step
// through to get to targetV. For now we've hard-coded this list.
_ = targetV
vs := []roachpb.Version{
cv.VersionByKey(cv.VersionNoopMigration),
}
Expand All @@ -190,8 +182,8 @@ func (m *Manager) MigrateTo(ctx context.Context, targetV roachpb.Version) error
// return. The migration associated with the specific version can assume
// that every node in the cluster has the corresponding version
// activated.
req := serverpb.AckClusterVersionRequest{Version: &version}
if err := h.EveryNode(ctx, req); err != nil {
op := &serverpb.AckClusterVersionRequest{Version: &version}
if err := h.EveryNode(ctx, op); err != nil {
return err
}

Expand Down
72 changes: 40 additions & 32 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
gwutil "github.com/grpc-ecosystem/grpc-gateway/utilities"
"google.golang.org/grpc"
Expand Down Expand Up @@ -1758,43 +1760,49 @@ func (s *adminServer) Decommission(
func (s *adminServer) EveryNode(
ctx context.Context, req *serverpb.EveryNodeRequest,
) (*serverpb.EveryNodeResponse, error) {
// TODO(irfansharif): We should write up something similar to the code
// generator for roachpb/batch_generated.go. Ditto for the response, this is
// pretty unwieldy to write by hand. Right now we're unconditionally pulling
// out the AckPendingVersion request, seeing as how it's the only one
// defined.
ackReq := req.Request.GetAckClusterVersion()
if ackReq == nil {
return nil, errors.Newf("unsupported req=%s", req.Request)
}

prevCV, err := kvserver.SynthesizeClusterVersionFromEngines(
ctx, s.server.engines, s.server.ClusterSettings().Version.BinaryVersion(),
s.server.ClusterSettings().Version.BinaryMinSupportedVersion(),
)
if err != nil {
return nil, err
}
op := req.Request.GetInner()
ctx = logtags.AddTag(ctx, "every-node", redact.Safe(op.Op()))

switch op.(type) {
case *serverpb.AckClusterVersionRequest:
versionSetting := s.server.ClusterSettings().Version
prevCV, err := kvserver.SynthesizeClusterVersionFromEngines(
ctx, s.server.engines, versionSetting.BinaryVersion(),
versionSetting.BinaryMinSupportedVersion(),
)
if err != nil {
return nil, err
}

log.Infof(ctx, "received op=ack-cluster-version, args=%s, prev=%s", ackReq.Version, prevCV.Version)
ackReq := req.Request.GetAckClusterVersion()
newCV := clusterversion.ClusterVersion{Version: *ackReq.Version}
log.Infof(ctx, "received args=%s, prev=%s", newCV, prevCV)

if prevCV.Version.Less(*ackReq.Version) {
cv := clusterversion.ClusterVersion{Version: *ackReq.Version}
if err := kvserver.WriteClusterVersionToEngines(ctx, s.server.engines, cv); err != nil {
return nil, err
// TODO(irfansharif): We'll want to serialize through a mutex here to
// make sure we're not clobbering the disk state if the RPC is being
// invoked concurrently.
if prevCV.Version.Less(*ackReq.Version) {
if err := kvserver.WriteClusterVersionToEngines(ctx, s.server.engines, newCV); err != nil {
return nil, err
}
log.Infof(ctx, "active cluster version persisted is now %s (up from %s)", newCV, prevCV)

// TODO(irfansharif): We'll eventually want to bump the local version
// gate here. On 21.1 nodes we'll no longer be using gossip to propagate
// cluster version bumps. We'll still have probably disseminate it
// through gossip (do we actually have to?), but we won't listen to it.
//
// _ = s.server.ClusterSettings().<...>.SetActiveVersion(ctx, newCV)
// log.Infof(ctx, "active cluster version setting is now %s (up from %s)", newCV, prevCV)
}

// TODO(irfansharif): We'll eventually want to bump the local version
// gate here. On 21.1 nodes we'll no longer be using gossip to propagate
// cluster version bumps. We'll still have probably disseminate it
// through gossip (do we actually have to?), but we won't listen to it.
//
// _ = s.server.ClusterSettings().<...>.Set(ctx, ackReq.Version)
ackResp := &serverpb.AckClusterVersionResponse{}
resp := &serverpb.EveryNodeResponse{}
resp.Response.SetInner(ackResp)
return resp, nil
default:
return nil, errors.Newf("unrecognized op=%s", op.Op())
}

ackResp := &serverpb.AckClusterVersionResponse{}
ackRespU := &serverpb.EveryNodeResponseUnion_AckClusterVersion{AckClusterVersion: ackResp}
return &serverpb.EveryNodeResponse{Response: serverpb.EveryNodeResponseUnion{Value: ackRespU}}, nil
}

// DataDistribution returns a count of replicas on each node for each table.
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/serverpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ go_library(
"admin.pb.gw.go",
"authentication.pb.go",
"authentication.pb.gw.go",
"everynode.go",
"everynode_generated.go",
"init.pb.go",
"status.go",
"status.pb.go",
Expand Down
29 changes: 29 additions & 0 deletions pkg/server/serverpb/everynode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package serverpb

//go:generate go run -tags gen-everynode gen_everynode.go

// EveryNodeOp is an interface satisfied by all allowable EveryNode requests.
type EveryNodeOp interface {
Op() string
}

// EveryNodeOpResp is an interface satisfied by all allowable EveryNode responses.
type EveryNodeOpResp interface {
everyNodeOpResp()
}

// Op implements the EveryNodeOp interface.
func (a *AckClusterVersionRequest) Op() string { return "ack-cluster-version" }

// everyNodeOpResp implements the EveryNodeOpResp interface.
func (a *AckClusterVersionResponse) everyNodeOpResp() {}
50 changes: 50 additions & 0 deletions pkg/server/serverpb/everynode_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c260c85

Please sign in to comment.