Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add downgrade cancellation e2e tests #19252

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion etcdctl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ DOWNGRADE ENABLE starts a downgrade action to cluster.
Downgrade enable success, cluster version 3.6
```

### DOWNGRADE CANCEL \<TARGET_VERSION\>
### DOWNGRADE CANCEL

DOWNGRADE CANCEL cancels the ongoing downgrade action to cluster.

Expand Down
106 changes: 98 additions & 8 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package e2e
import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
Expand All @@ -37,23 +39,56 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

type CancellationState int

const (
noCancellation CancellationState = iota
cancelRightBeforeEnable
cancelRightAfterEnable
cancelAfterDowngrading
)

func TestDowngradeUpgradeClusterOf1(t *testing.T) {
testDowngradeUpgrade(t, 1, false)
testDowngradeUpgrade(t, 1, 1, false, noCancellation)
}

func TestDowngradeUpgradeClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 3, false)
testDowngradeUpgrade(t, 3, 3, false, noCancellation)
}

func TestDowngradeUpgradeClusterOf1WithSnapshot(t *testing.T) {
testDowngradeUpgrade(t, 1, true)
testDowngradeUpgrade(t, 1, 1, true, noCancellation)
}

func TestDowngradeUpgradeClusterOf3WithSnapshot(t *testing.T) {
testDowngradeUpgrade(t, 3, true)
testDowngradeUpgrade(t, 3, 3, true, noCancellation)
}

func TestDowngradeCancellationWithoutEnablingClusterOf1(t *testing.T) {
testDowngradeUpgrade(t, 1, 1, false, cancelRightBeforeEnable)
}

func TestDowngradeCancellationRightAfterEnablingClusterOf1(t *testing.T) {
testDowngradeUpgrade(t, 1, 1, false, cancelRightAfterEnable)
}

func TestDowngradeCancellationWithoutEnablingClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 3, 3, false, cancelRightBeforeEnable)
}

func TestDowngradeCancellationRightAfterEnablingClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 3, 3, false, cancelRightAfterEnable)
}

func TestDowngradeCancellationAfterDowngrading1InClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 1, 3, false, cancelAfterDowngrading)
}

func TestDowngradeCancellationAfterDowngrading2InClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 2, 3, false, cancelAfterDowngrading)
}

func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
func testDowngradeUpgrade(t *testing.T, cancellationSize int, clusterSize int, triggerSnapshot bool, triggerCancellation CancellationState) {
currentEtcdBinary := e2e.BinPath.Etcd
lastReleaseBinary := e2e.BinPath.EtcdLastRelease
if !fileutil.Exist(lastReleaseBinary) {
Expand Down Expand Up @@ -107,10 +142,27 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
require.NoError(t, err)
beforeMembers, beforeKV := getMembersAndKeys(t, cc)

if triggerCancellation == cancelRightBeforeEnable {
t.Logf("Cancelling downgrade before enabling")
e2e.DowngradeCancel(t, epc, generateIdenticalVersions(clusterSize, currentVersion))
return // No need to perform downgrading, end the test here
}
e2e.DowngradeEnable(t, epc, lastVersion)
if triggerCancellation == cancelRightAfterEnable {
t.Logf("Cancelling downgrade right after enabling (no node is downgraded yet)")
e2e.DowngradeCancel(t, epc, generateIdenticalVersions(clusterSize, currentVersion))
return // No need to perform downgrading, end the test here
}

membersToChange := rand.Perm(len(epc.Procs))[:cancellationSize]
t.Logf(fmt.Sprintln("Elect members for operations"), zap.Any("members", membersToChange))

t.Logf("Starting downgrade process to %q", lastVersionStr)
e2e.DowngradeUpgradeMembers(t, nil, epc, len(epc.Procs), currentVersion, lastClusterVersion)
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, currentVersion, lastClusterVersion)
require.NoError(t, err)
if len(membersToChange) == len(epc.Procs) {
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
}

t.Log("Downgrade complete")
afterMembers, afterKV := getMembersAndKeys(t, cc)
Expand All @@ -121,6 +173,11 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
t.Log("Waiting health interval to required to make membership changes")
time.Sleep(etcdserver.HealthInterval)
}

if triggerCancellation == cancelAfterDowngrading {
e2e.DowngradeCancel(t, epc, generatePartialCancellationVersions(clusterSize, membersToChange, lastClusterVersion))
}

t.Log("Adding learner to test membership, but avoid breaking quorum")
resp, err = cc.MemberAddAsLearner(context.Background(), "fake2", []string{"http://127.0.0.1:1002"})
require.NoError(t, err)
Expand All @@ -135,7 +192,8 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
beforeMembers, beforeKV = getMembersAndKeys(t, cc)

t.Logf("Starting upgrade process to %q", currentVersionStr)
e2e.DowngradeUpgradeMembers(t, nil, epc, len(epc.Procs), lastClusterVersion, currentVersion)
err = e2e.DowngradeUpgradeMembersByID(t, nil, epc, membersToChange, lastClusterVersion, currentVersion)
require.NoError(t, err)
t.Log("Upgrade complete")

afterMembers, afterKV = getMembersAndKeys(t, cc)
Expand Down Expand Up @@ -233,3 +291,35 @@ func getMembersAndKeys(t *testing.T, cc *e2e.EtcdctlV3) (*clientv3.MemberListRes

return members, kvs
}

func generateIdenticalVersions(clusterSize int, ver *semver.Version) []*version.Versions {
ret := make([]*version.Versions, clusterSize)

for i := range clusterSize {
ret[i] = &version.Versions{
Cluster: ver.String(),
Server: ver.String(),
Storage: ver.String(),
}
}

return ret
}

func generatePartialCancellationVersions(clusterSize int, membersToChange []int, ver *semver.Version) []*version.Versions {
ret := make([]*version.Versions, clusterSize)

for i := range clusterSize {
ret[i] = &version.Versions{
Cluster: ver.String(),
Server: e2e.OffsetMinor(ver, 1).String(),
Storage: "",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will continue to investigate this tomorrow

}
}

for i := range membersToChange {
ret[membersToChange[i]].Server = ver.String()
}

return ret
}
37 changes: 31 additions & 6 deletions tests/framework/e2e/downgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func DowngradeEnable(t *testing.T, epc *EtcdProcessCluster, ver *semver.Version)
for i := 0; i < len(epc.Procs); i++ {
ValidateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: ver.String(),
Server: offsetMinor(ver, 1).String(),
Server: OffsetMinor(ver, 1).String(),
Storage: ver.String(),
})
AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
Expand All @@ -53,7 +53,34 @@ func DowngradeEnable(t *testing.T, epc *EtcdProcessCluster, ver *semver.Version)
t.Log("Cluster is ready for downgrade")
}

func DowngradeCancel(t *testing.T, epc *EtcdProcessCluster, versions []*version.Versions) {
t.Logf("etcdctl downgrade cancel")
c := epc.Etcdctl()
testutils.ExecuteWithTimeout(t, 20*time.Second, func() {
err := c.DowngradeCancel(context.TODO())
require.NoError(t, err)
})

t.Log("Downgrade cancelled, validating if cluster is in the right state")
for i := 0; i < len(epc.Procs); i++ {
ValidateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: versions[i].Cluster,
Server: versions[i].Server,
Storage: versions[i].Storage,
})
}

t.Log("Cluster downgrade cancellation is completed")
}

func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, numberOfMembersToChange int, currentVersion, targetVersion *semver.Version) error {
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
t.Logf(fmt.Sprintln("Elect members for operations"), zap.Any("members", membersToChange))

return DowngradeUpgradeMembersByID(t, lg, clus, membersToChange, currentVersion, targetVersion)
}

func DowngradeUpgradeMembersByID(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, membersToChange []int, currentVersion, targetVersion *semver.Version) error {
if lg == nil {
lg = clus.lg
}
Expand All @@ -64,8 +91,6 @@ func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessClus
opString = "downgrading"
newExecPath = BinPath.EtcdLastRelease
}
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
lg.Info(fmt.Sprintf("Test %s members", opString), zap.Any("members", membersToChange))

// Need to wait health interval for cluster to prepare for downgrade/upgrade
time.Sleep(etcdserver.HealthInterval)
Expand All @@ -89,7 +114,7 @@ func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessClus
lg.Info("Validating versions")
for _, memberID := range membersToChange {
member := clus.Procs[memberID]
if isDowngrade || numberOfMembersToChange == len(clus.Procs) {
if isDowngrade || len(membersToChange) == len(clus.Procs) {
ValidateVersion(t, clus.Cfg, member, version.Versions{
Cluster: targetVersion.String(),
Server: targetVersion.String(),
Expand Down Expand Up @@ -125,8 +150,8 @@ func ValidateVersion(t *testing.T, cfg *EtcdProcessClusterConfig, member EtcdPro
})
}

// offsetMinor returns the version with offset from the original minor, with the same major.
func offsetMinor(v *semver.Version, offset int) *semver.Version {
// OffsetMinor returns the version with offset from the original minor, with the same major.
func OffsetMinor(v *semver.Version, offset int) *semver.Version {
var minor int64
if offset >= 0 {
minor = v.Minor + int64(offset)
Expand Down
5 changes: 5 additions & 0 deletions tests/framework/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (ctl *EtcdctlV3) DowngradeEnable(ctx context.Context, version string) error
return err
}

func (ctl *EtcdctlV3) DowngradeCancel(ctx context.Context) error {
_, err := SpawnWithExpectLines(ctx, ctl.cmdArgs("downgrade", "cancel"), nil, expect.ExpectedResponse{Value: "Downgrade cancel success"})
return err
}

func (ctl *EtcdctlV3) Get(ctx context.Context, key string, o config.GetOptions) (*clientv3.GetResponse, error) {
resp := clientv3.GetResponse{}
var args []string
Expand Down
Loading