diff --git a/pkg/migration/BUILD.bazel b/pkg/migration/BUILD.bazel index dace22c5020a..70393656b9e9 100644 --- a/pkg/migration/BUILD.bazel +++ b/pkg/migration/BUILD.bazel @@ -17,5 +17,6 @@ go_library( "//pkg/util/log", "//vendor/github.com/cockroachdb/errors", "//vendor/github.com/cockroachdb/logtags", + "//vendor/github.com/cockroachdb/redact", ], ) diff --git a/pkg/migration/manager.go b/pkg/migration/manager.go index e624c21058a6..9a4c8459e684 100644 --- a/pkg/migration/manager.go +++ b/pkg/migration/manager.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" + "github.com/cockroachdb/redact" ) // Migration defines a program to be executed once every node in the cluster is @@ -118,27 +119,26 @@ func (h *Helper) Log(event string) error { return nil } -// EveryNode lets migrations execute the given EveryNodeRequest across every -// node in the cluster. -func (h *Helper) EveryNode(ctx context.Context, req EveryNodeRequest) error { +// EveryNode lets migrations execute the given EveryNodeOp across every node in +// the cluster. +func (h *Helper) EveryNode(ctx context.Context, op serverpb.EveryNodeOp) error { nodeIDs, err := h.RequiredNodes(ctx) if err != nil { return err } // TODO(irfansharif): We can/should send out these RPCs in parallel. - log.Infof(ctx, "executing req=%s on nodes=%s", req, nodeIDs) + log.Infof(ctx, "executing op=%s on nodes=%s", redact.Safe(op.Op()), redact.Safe(nodeIDs)) for _, nodeID := range nodeIDs { conn, err := h.dialer.Dial(ctx, nodeID, rpc.DefaultClass) if err != nil { return err } - // TODO(irfansharif): This typecasting below is busted. We'll need - // wrapping/unwrapping code around the EveryNodeRequest internal union - // type. + req := &serverpb.EveryNodeRequest{} + req.Request.SetInner(op) client := serverpb.NewAdminClient(conn) - if _, err := client.EveryNode(ctx, req.(*serverpb.EveryNodeRequest)); err != nil { + if _, err := client.EveryNode(ctx, req); err != nil { return err } } @@ -149,19 +149,12 @@ func (h *Helper) EveryNode(ctx context.Context, req EveryNodeRequest) error { return nil } -// EveryNodeRequest is an interface only satisfied by valid request types to the -// EveryNode RPC. -// -// TODO(irfansharif): Make this so. Should probably be defined in -// pkg/server/serverpb. -type EveryNodeRequest interface{} - // MigrateTo runs the set of migrations required to upgrade the cluster version // to the provided target version. func (m *Manager) MigrateTo(ctx context.Context, targetV roachpb.Version) error { // TODO(irfansharif): Should we inject every ctx here with specific labels // for each migration, so they log distinctly? Do we need an AmbientContext? - ctx = logtags.AddTag(ctx, "migration-mgr", nil) + ctx = logtags.AddTag(ctx, "migration-mgr", redact.Safe(targetV)) // TODO(irfansharif): We'll need to acquire a lease here and refresh it // throughout during the migration to ensure mutual exclusion. @@ -178,7 +171,6 @@ func (m *Manager) MigrateTo(ctx context.Context, targetV roachpb.Version) error // TODO(irfansharif): After determining the last completed migration, if // any, we'll be want to assemble the list of remaining migrations to step // through to get to targetV. For now we've hard-coded this list. - _ = targetV vs := []roachpb.Version{ cv.VersionByKey(cv.VersionNoopMigration), } @@ -190,8 +182,8 @@ func (m *Manager) MigrateTo(ctx context.Context, targetV roachpb.Version) error // return. The migration associated with the specific version can assume // that every node in the cluster has the corresponding version // activated. - req := serverpb.AckClusterVersionRequest{Version: &version} - if err := h.EveryNode(ctx, req); err != nil { + op := &serverpb.AckClusterVersionRequest{Version: &version} + if err := h.EveryNode(ctx, op); err != nil { return err } diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 877aec1ee487..9e8d10412b4f 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -54,6 +54,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" + "github.com/cockroachdb/redact" gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" gwutil "github.com/grpc-ecosystem/grpc-gateway/utilities" "google.golang.org/grpc" @@ -1758,43 +1760,49 @@ func (s *adminServer) Decommission( func (s *adminServer) EveryNode( ctx context.Context, req *serverpb.EveryNodeRequest, ) (*serverpb.EveryNodeResponse, error) { - // TODO(irfansharif): We should write up something similar to the code - // generator for roachpb/batch_generated.go. Ditto for the response, this is - // pretty unwieldy to write by hand. Right now we're unconditionally pulling - // out the AckPendingVersion request, seeing as how it's the only one - // defined. - ackReq := req.Request.GetAckClusterVersion() - if ackReq == nil { - return nil, errors.Newf("unsupported req=%s", req.Request) - } - - prevCV, err := kvserver.SynthesizeClusterVersionFromEngines( - ctx, s.server.engines, s.server.ClusterSettings().Version.BinaryVersion(), - s.server.ClusterSettings().Version.BinaryMinSupportedVersion(), - ) - if err != nil { - return nil, err - } + op := req.Request.GetInner() + ctx = logtags.AddTag(ctx, "every-node", redact.Safe(op.Op())) + + switch op.(type) { + case *serverpb.AckClusterVersionRequest: + versionSetting := s.server.ClusterSettings().Version + prevCV, err := kvserver.SynthesizeClusterVersionFromEngines( + ctx, s.server.engines, versionSetting.BinaryVersion(), + versionSetting.BinaryMinSupportedVersion(), + ) + if err != nil { + return nil, err + } - log.Infof(ctx, "received op=ack-cluster-version, args=%s, prev=%s", ackReq.Version, prevCV.Version) + ackReq := req.Request.GetAckClusterVersion() + newCV := clusterversion.ClusterVersion{Version: *ackReq.Version} + log.Infof(ctx, "received args=%s, prev=%s", newCV, prevCV) - if prevCV.Version.Less(*ackReq.Version) { - cv := clusterversion.ClusterVersion{Version: *ackReq.Version} - if err := kvserver.WriteClusterVersionToEngines(ctx, s.server.engines, cv); err != nil { - return nil, err + // TODO(irfansharif): We'll want to serialize through a mutex here to + // make sure we're not clobbering the disk state if the RPC is being + // invoked concurrently. + if prevCV.Version.Less(*ackReq.Version) { + if err := kvserver.WriteClusterVersionToEngines(ctx, s.server.engines, newCV); err != nil { + return nil, err + } + log.Infof(ctx, "active cluster version persisted is now %s (up from %s)", newCV, prevCV) + + // TODO(irfansharif): We'll eventually want to bump the local version + // gate here. On 21.1 nodes we'll no longer be using gossip to propagate + // cluster version bumps. We'll still have probably disseminate it + // through gossip (do we actually have to?), but we won't listen to it. + // + // _ = s.server.ClusterSettings().<...>.SetActiveVersion(ctx, newCV) + // log.Infof(ctx, "active cluster version setting is now %s (up from %s)", newCV, prevCV) } - // TODO(irfansharif): We'll eventually want to bump the local version - // gate here. On 21.1 nodes we'll no longer be using gossip to propagate - // cluster version bumps. We'll still have probably disseminate it - // through gossip (do we actually have to?), but we won't listen to it. - // - // _ = s.server.ClusterSettings().<...>.Set(ctx, ackReq.Version) + ackResp := &serverpb.AckClusterVersionResponse{} + resp := &serverpb.EveryNodeResponse{} + resp.Response.SetInner(ackResp) + return resp, nil + default: + return nil, errors.Newf("unrecognized op=%s", op.Op()) } - - ackResp := &serverpb.AckClusterVersionResponse{} - ackRespU := &serverpb.EveryNodeResponseUnion_AckClusterVersion{AckClusterVersion: ackResp} - return &serverpb.EveryNodeResponse{Response: serverpb.EveryNodeResponseUnion{Value: ackRespU}}, nil } // DataDistribution returns a count of replicas on each node for each table. diff --git a/pkg/server/serverpb/BUILD.bazel b/pkg/server/serverpb/BUILD.bazel index d16f33ebb0aa..363323fa7547 100644 --- a/pkg/server/serverpb/BUILD.bazel +++ b/pkg/server/serverpb/BUILD.bazel @@ -8,6 +8,8 @@ go_library( "admin.pb.gw.go", "authentication.pb.go", "authentication.pb.gw.go", + "everynode.go", + "everynode_generated.go", "init.pb.go", "status.go", "status.pb.go", diff --git a/pkg/server/serverpb/everynode.go b/pkg/server/serverpb/everynode.go new file mode 100644 index 000000000000..e89cf24cc27b --- /dev/null +++ b/pkg/server/serverpb/everynode.go @@ -0,0 +1,29 @@ +// Copyright 2018 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package serverpb + +//go:generate go run -tags gen-everynode gen_everynode.go + +// EveryNodeOp is an interface satisfied by all allowable EveryNode requests. +type EveryNodeOp interface { + Op() string +} + +// EveryNodeOpResp is an interface satisfied by all allowable EveryNode responses. +type EveryNodeOpResp interface { + everyNodeOpResp() +} + +// Op implements the EveryNodeOp interface. +func (a *AckClusterVersionRequest) Op() string { return "ack-cluster-version" } + +// everyNodeOpResp implements the EveryNodeOpResp interface. +func (a *AckClusterVersionResponse) everyNodeOpResp() {} diff --git a/pkg/server/serverpb/everynode_generated.go b/pkg/server/serverpb/everynode_generated.go new file mode 100644 index 000000000000..e027e12930fb --- /dev/null +++ b/pkg/server/serverpb/everynode_generated.go @@ -0,0 +1,50 @@ +// Code generated by gen_batch.go; DO NOT EDIT. +// GENERATED FILE DO NOT EDIT + +package serverpb + +// GetInner returns the EveryNodeOp contained in the union. +func (ru EveryNodeRequestUnion) GetInner() EveryNodeOp { + switch t := ru.GetValue().(type) { + case *EveryNodeRequestUnion_AckClusterVersion: + return t.AckClusterVersion + default: + return nil + } +} + +// GetInner returns the EveryNodeOpResp contained in the union. +func (ru EveryNodeResponseUnion) GetInner() EveryNodeOpResp { + switch t := ru.GetValue().(type) { + case *EveryNodeResponseUnion_AckClusterVersion: + return t.AckClusterVersion + default: + return nil + } +} + +// SetInner sets the EveryNodeOp in the union. +func (ru *EveryNodeRequestUnion) SetInner(r EveryNodeOp) bool { + var union isEveryNodeRequestUnion_Value + switch t := r.(type) { + case *AckClusterVersionRequest: + union = &EveryNodeRequestUnion_AckClusterVersion{t} + default: + return false + } + ru.Value = union + return true +} + +// SetInner sets the EveryNodeOpResp in the union. +func (ru *EveryNodeResponseUnion) SetInner(r EveryNodeOpResp) bool { + var union isEveryNodeResponseUnion_Value + switch t := r.(type) { + case *AckClusterVersionResponse: + union = &EveryNodeResponseUnion_AckClusterVersion{t} + default: + return false + } + ru.Value = union + return true +} diff --git a/pkg/server/serverpb/gen_everynode.go b/pkg/server/serverpb/gen_everynode.go new file mode 100644 index 000000000000..a385aa8b124e --- /dev/null +++ b/pkg/server/serverpb/gen_everynode.go @@ -0,0 +1,154 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// This file generates batch_generated.go. It can be run via: +// go run -tags gen-everynode gen_everynode.go +// +// It was mostly cargo culted from pkg/roachpb/gen_batch.go + +// +build gen-everynode + +package main + +import ( + "fmt" + "io" + "os" + "reflect" + + "github.com/cockroachdb/cockroach/pkg/server/serverpb" +) + +type variantInfo struct { + // variantType is the name of the variant type that implements + // the union interface (isEveryNodeRequestUnion_Value,isEveryNodeResponseUnion_Value). + variantType string + // variantName is the unique suffix of variantType. It is also + // the name of the single field in this type. + variantName string + // msgType is the name of the variant's corresponding Request/Response + // type. + msgType string +} + +var errVariants []variantInfo +var reqVariants []variantInfo +var resVariants []variantInfo +var reqResVariantMapping map[variantInfo]variantInfo + +func initVariant(varInstance interface{}) variantInfo { + t := reflect.TypeOf(varInstance) + f := t.Elem().Field(0) // variants always have 1 field + return variantInfo{ + variantType: t.Elem().Name(), + variantName: f.Name, + msgType: f.Type.Elem().Name(), + } +} + +func initVariants() { + _, _, _, resVars := (&serverpb.EveryNodeResponseUnion{}).XXX_OneofFuncs() + resVarInfos := make(map[string]variantInfo, len(resVars)) + for _, v := range resVars { + resInfo := initVariant(v) + resVariants = append(resVariants, resInfo) + resVarInfos[resInfo.variantName] = resInfo + } + + _, _, _, reqVars := (&serverpb.EveryNodeRequestUnion{}).XXX_OneofFuncs() + reqResVariantMapping = make(map[variantInfo]variantInfo, len(reqVars)) + for _, v := range reqVars { + reqInfo := initVariant(v) + reqVariants = append(reqVariants, reqInfo) + + resName := reqInfo.variantName + resInfo, ok := resVarInfos[resName] + if !ok { + panic(fmt.Sprintf("unknown response variant %q", resName)) + } + reqResVariantMapping[reqInfo] = resInfo + } +} + +func genGetInner(w io.Writer, unionName, variantName string, variants []variantInfo) { + fmt.Fprintf(w, ` +// GetInner returns the %[2]s contained in the union. +func (ru %[1]s) GetInner() %[2]s { + switch t := ru.GetValue().(type) { +`, unionName, variantName) + + for _, v := range variants { + fmt.Fprintf(w, ` case *%s: + return t.%s +`, v.variantType, v.variantName) + } + + fmt.Fprint(w, ` default: + return nil + } +} +`) +} + +func genSetInner(w io.Writer, unionName, variantName string, variants []variantInfo) { + fmt.Fprintf(w, ` +// SetInner sets the %[2]s in the union. +func (ru *%[1]s) SetInner(r %[2]s) bool { + var union is%[1]s_Value + switch t := r.(type) { +`, unionName, variantName) + + for _, v := range variants { + fmt.Fprintf(w, ` case *%s: + union = &%s{t} +`, v.msgType, v.variantType) + } + + fmt.Fprint(w, ` default: + return false + } + ru.Value = union + return true +} +`) +} + +func main() { + initVariants() + + f, err := os.Create("everynode_generated.go") + if err != nil { + fmt.Fprintln(os.Stderr, "Error opening file: ", err) + os.Exit(1) + } + + // First comment for github/Go; second for reviewable. + // https://github.com/golang/go/issues/13560#issuecomment-277804473 + // https://github.com/Reviewable/Reviewable/wiki/FAQ#how-do-i-tell-reviewable-that-a-file-is-generated-and-should-not-be-reviewed + fmt.Fprint(f, `// Code generated by gen_batch.go; DO NOT EDIT. +// GENERATED FILE DO NOT EDIT + +package serverpb + +`) + + // Generate GetInner methods. + genGetInner(f, "EveryNodeRequestUnion", "EveryNodeOp", reqVariants) + genGetInner(f, "EveryNodeResponseUnion", "EveryNodeOpResp", resVariants) + + // Generate SetInner methods. + genSetInner(f, "EveryNodeRequestUnion", "EveryNodeOp", reqVariants) + genSetInner(f, "EveryNodeResponseUnion", "EveryNodeOpResp", resVariants) + + if err := f.Close(); err != nil { + fmt.Fprintln(os.Stderr, "Error closing file: ", err) + os.Exit(1) + } +}