From a0f26ff4ea991c9d1417f7d10732e52536e36e4e Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 11 Feb 2022 15:04:44 +0100 Subject: [PATCH] server: Snapshot after cluster version downgrade --- server/etcdserver/apply.go | 15 ++++++++++++++- server/etcdserver/server.go | 12 ++++++++++-- tests/e2e/cluster_downgrade_test.go | 13 ------------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index f91f859f511..7eb53ebcc61 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -936,7 +936,20 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList } func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) { - a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability, shouldApplyV3) + prevVersion := a.s.Cluster().Version() + newVersion := semver.Must(semver.NewVersion(r.Ver)) + a.s.cluster.SetVersion(newVersion, api.UpdateCapability, shouldApplyV3) + // Force snapshot after cluster version downgrade. + if prevVersion != nil && newVersion.LessThan(*prevVersion) { + lg := a.s.Logger() + if lg != nil { + lg.Info("Cluster version downgrade detected, forcing snapshot", + zap.String("prev-cluster-version", prevVersion.String()), + zap.String("new-cluster-version", newVersion.String()), + ) + } + a.s.forceSnapshot = true + } } func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 8d6a430cc37..d10559a4e21 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -291,6 +291,9 @@ type EtcdServer struct { clusterVersionChanged *notify.Notifier *AccessController + // forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount. + // Should only be set within apply code path. Used to force snapshot after cluster version downgrade. + forceSnapshot bool } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -1079,10 +1082,9 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { } func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { - if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount { + if !s.shouldSnapshot(ep) { return } - lg := s.Logger() lg.Info( "triggering snapshot", @@ -1090,12 +1092,18 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { zap.Uint64("local-member-applied-index", ep.appliedi), zap.Uint64("local-member-snapshot-index", ep.snapi), zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), + zap.Bool("snapshot-forced", s.forceSnapshot), ) + s.forceSnapshot = false s.snapshot(ep.appliedi, ep.confState) ep.snapi = ep.appliedi } +func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool { + return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount) +} + func (s *EtcdServer) hasMultipleVotingMembers() bool { return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1 } diff --git a/tests/e2e/cluster_downgrade_test.go b/tests/e2e/cluster_downgrade_test.go index 655067ceabd..5e2eb9ff209 100644 --- a/tests/e2e/cluster_downgrade_test.go +++ b/tests/e2e/cluster_downgrade_test.go @@ -17,7 +17,6 @@ package e2e import ( "context" "fmt" - "strings" "testing" "time" @@ -66,8 +65,6 @@ func startEtcd(t *testing.T, execPath, dataDirPath string) *e2e.EtcdProcessClust ClusterSize: 1, InitialToken: "new", KeepDataDir: true, - // TODO: REMOVE snapshot override when snapshotting is automated after lowering storage versiont l - SnapshotCount: 5, }) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) @@ -77,16 +74,6 @@ func startEtcd(t *testing.T, execPath, dataDirPath string) *e2e.EtcdProcessClust t.Fatalf("error closing etcd processes (%v)", errC) } }) - - prefixArgs := []string{e2e.CtlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ",")} - t.Log("Write keys to ensure wal snapshot is created so cluster version set is snapshotted") - e2e.ExecuteWithTimeout(t, 20*time.Second, func() { - for i := 0; i < 10; i++ { - if err := e2e.SpawnWithExpect(append(prefixArgs, "put", fmt.Sprintf("%d", i), "value"), "OK"); err != nil { - t.Fatal(err) - } - } - }) return epc }