From ac43a74a2c2dbc0a7e61bb88aae3fb885b013c68 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 29 Aug 2022 12:11:34 +0200 Subject: [PATCH] tests: Validate etcd linearizability Signed-off-by: Marek Siarkowicz --- .github/workflows/linearizability.yaml | 11 ++ Makefile | 4 + go.sum | 3 + pkg/expect/expect.go | 106 +++++++++---- scripts/test.sh | 5 + tests/framework/config/cluster.go | 1 + tests/framework/e2e.go | 5 + tests/framework/e2e/cluster.go | 3 +- tests/framework/e2e/cluster_proxy.go | 2 +- tests/framework/e2e/etcd_process.go | 11 +- tests/framework/e2e/etcd_spawn.go | 2 +- tests/framework/e2e/etcd_spawn_cov.go | 2 +- tests/framework/e2e/etcd_spawn_nocov.go | 14 +- tests/framework/interface.go | 1 + tests/go.mod | 1 + tests/go.sum | 2 + tests/linearizability/main_test.go | 27 ++++ tests/linearizability/model.go | 112 ++++++++++++++ tests/linearizability/putget_test.go | 193 ++++++++++++++++++++++++ 19 files changed, 463 insertions(+), 42 deletions(-) create mode 100644 .github/workflows/linearizability.yaml create mode 100644 tests/linearizability/main_test.go create mode 100644 tests/linearizability/model.go create mode 100644 tests/linearizability/putget_test.go diff --git a/.github/workflows/linearizability.yaml b/.github/workflows/linearizability.yaml new file mode 100644 index 000000000000..09965953f286 --- /dev/null +++ b/.github/workflows/linearizability.yaml @@ -0,0 +1,11 @@ +name: Linearizability +on: [push, pull_request] +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-go@v2 + with: + go-version: "1.19.1" + - run: make test-linearizability diff --git a/Makefile b/Makefile index fb05e5ca5053..556d0cb98fb3 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,10 @@ test-e2e: build test-e2e-release: build PASSES="release e2e" ./scripts/test.sh +.PHONY: test-linearizability +test-linearizability: build + PASSES="linearizability" ./scripts/test.sh + # Static analysis verify: verify-gofmt verify-bom verify-lint verify-dep verify-shellcheck verify-goword verify-govet verify-license-header verify-receiver-name verify-mod-tidy verify-shellcheck verify-shellws verify-proto-annotations diff --git a/go.sum b/go.sum index 70338cde9b5f..8b0fac3bedf7 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/anishathalye/porcupine v0.1.2/go.mod h1:/X9OQYnVb7DzfKCQVO4tI1Aq+o56UJW+RvN/5U4EuZA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -327,6 +328,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= +go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 h1:TcXBU/YdVROXQ7FUowVK1ih9gu2yi3YMLE+tQb9q964= +go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0/go.mod h1:bOzzUWJ5bNHifkNkoIN6Ydf/z/UPT0bYuPghFYVC8+4= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/pkg/expect/expect.go b/pkg/expect/expect.go index 3eb636aacbc8..763386f85927 100644 --- a/pkg/expect/expect.go +++ b/pkg/expect/expect.go @@ -36,9 +36,13 @@ const DEBUG_LINES_TAIL = 40 type ExpectProcess struct { cfg expectConfig - cmd *exec.Cmd - fpty *os.File - wg sync.WaitGroup + // StopSignal is the signal Stop sends to the process; defaults to SIGTERM. + StopSignal os.Signal + + cmd *exec.Cmd + fpty *os.File + closech chan struct{} + logsCollected sync.WaitGroup mu sync.Mutex // protects lines and err lines []string @@ -50,18 +54,20 @@ type ExpectProcess struct { // NewExpect creates a new process for expect testing. func NewExpect(name string, arg ...string) (ep *ExpectProcess, err error) { // if env[] is nil, use current system env and the default command as name - return NewExpectWithEnv(name, arg, nil, name) + return NewExpectWithEnv(name, arg, nil, name, false) } // NewExpectWithEnv creates a new process with user defined env variables for expect testing. -func NewExpectWithEnv(name string, args []string, env []string, serverProcessConfigName string) (ep *ExpectProcess, err error) { +func NewExpectWithEnv(name string, args []string, env []string, serverProcessConfigName string, restart bool) (ep *ExpectProcess, err error) { ep = &ExpectProcess{ cfg: expectConfig{ - name: serverProcessConfigName, - cmd: name, - args: args, - env: env, + name: serverProcessConfigName, + cmd: name, + args: args, + env: env, + restart: restart, }, + closech: make(chan struct{}), } ep.cmd = commandFromConfig(ep.cfg) @@ -69,16 +75,17 @@ func NewExpectWithEnv(name string, args []string, env []string, serverProcessCon return nil, err } - ep.wg.Add(1) + ep.logsCollected.Add(1) go ep.read() return ep, nil } type expectConfig struct { - name string - cmd string - args []string - env []string + name string + cmd string + args []string + env []string + restart bool } func commandFromConfig(config expectConfig) *exec.Cmd { @@ -94,23 +101,52 @@ func (ep *ExpectProcess) Pid() int { } func (ep *ExpectProcess) read() { - defer ep.wg.Done() + defer ep.logsCollected.Done() printDebugLines := os.Getenv("EXPECT_DEBUG") != "" - r := bufio.NewReader(ep.fpty) for { - l, err := r.ReadString('\n') ep.mu.Lock() - if l != "" { - if printDebugLines { - fmt.Printf("%s (%s) (%d): %s", ep.cmd.Path, ep.cfg.name, ep.cmd.Process.Pid, l) + cmd := ep.cmd + r := bufio.NewReader(ep.fpty) + ep.mu.Unlock() + if cmd == nil { + break + } + pid := cmd.Process.Pid + for { + l, err := r.ReadString('\n') + ep.mu.Lock() + if l != "" { + if printDebugLines { + fmt.Printf("%s (%s) (%d): %s", ep.cmd.Path, ep.cfg.name, pid, l) + } + ep.lines = append(ep.lines, l) + ep.count++ } - ep.lines = append(ep.lines, l) - ep.count++ + if err != nil { + ep.err = err + ep.mu.Unlock() + break + } + ep.mu.Unlock() } - if err != nil { + select { + case <-ep.closech: + return + default: + } + ep.mu.Lock() + cmd = ep.cmd + ep.mu.Unlock() + if cmd != nil { + cmd.Wait() + } + ep.mu.Lock() + var err error + ep.cmd = commandFromConfig(ep.cfg) + if ep.fpty, err = pty.Start(ep.cmd); err != nil { + fmt.Printf("Error %s\n", err) ep.err = err - ep.mu.Unlock() - break + ep.cmd = nil } ep.mu.Unlock() } @@ -179,7 +215,10 @@ func (ep *ExpectProcess) Stop() error { return ep.close(true) } // Signal sends a signal to the expect process func (ep *ExpectProcess) Signal(sig os.Signal) error { - return ep.cmd.Process.Signal(sig) + ep.mu.Lock() + err := ep.cmd.Process.Signal(sig) + ep.mu.Unlock() + return err } // Close waits for the expect process to exit. @@ -188,16 +227,22 @@ func (ep *ExpectProcess) Signal(sig os.Signal) error { func (ep *ExpectProcess) Close() error { return ep.close(false) } func (ep *ExpectProcess) close(kill bool) error { - if ep.cmd == nil { + ep.mu.Lock() + cmd := ep.cmd + ep.mu.Unlock() + + if cmd == nil { return ep.err } + close(ep.closech) + if kill { ep.Signal(syscall.SIGTERM) } - err := ep.cmd.Wait() + err := cmd.Wait() ep.fpty.Close() - ep.wg.Wait() + ep.logsCollected.Wait() if err != nil { if !kill && strings.Contains(err.Error(), "exit status") { @@ -207,8 +252,9 @@ func (ep *ExpectProcess) close(kill bool) error { err = nil } } - + ep.mu.Lock() ep.cmd = nil + ep.mu.Unlock() return err } diff --git a/scripts/test.sh b/scripts/test.sh index 5754e828d694..d9ba0fd225fc 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -124,6 +124,11 @@ function e2e_pass { run_for_module "tests" go_test "./common/..." "keep_going" : --tags=e2e -timeout="${TIMEOUT:-30m}" "${RUN_ARG[@]}" "$@" } +function linearizability_pass { + # e2e tests are running pre-build binary. Settings like --race,-cover,-cpu does not have any impact. + run_for_module "tests" go_test "./linearizability/..." "keep_going" : -timeout="${TIMEOUT:-30m}" "${RUN_ARG[@]}" "$@" +} + function integration_e2e_pass { run_pass "integration" "${@}" run_pass "e2e" "${@}" diff --git a/tests/framework/config/cluster.go b/tests/framework/config/cluster.go index 5b79e5ff4290..da237150470a 100644 --- a/tests/framework/config/cluster.go +++ b/tests/framework/config/cluster.go @@ -34,4 +34,5 @@ type ClusterConfig struct { DisableStrictReconfigCheck bool AuthToken string SnapshotCount int + Restart bool } diff --git a/tests/framework/e2e.go b/tests/framework/e2e.go index 1935cbe0f6d8..8acfcac70a8f 100644 --- a/tests/framework/e2e.go +++ b/tests/framework/e2e.go @@ -52,6 +52,7 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.Clus DisableStrictReconfigCheck: cfg.DisableStrictReconfigCheck, AuthTokenOpts: cfg.AuthToken, SnapshotCount: cfg.SnapshotCount, + Restart: cfg.Restart, } switch cfg.ClientTLS { case config.NoTLS: @@ -109,6 +110,10 @@ func (c *e2eCluster) Client(cfg clientv3.AuthConfig) (Client, error) { return e2eClient{etcdctl}, nil } +func (c *e2eCluster) Endpoints() []string { + return c.EndpointsV3() +} + func (c *e2eCluster) Members() (ms []Member) { for _, proc := range c.EtcdProcessCluster.Procs { ms = append(ms, e2eMember{EtcdProcess: proc, Cfg: c.Cfg}) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 250cc56ecf90..2caf441362f3 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -178,6 +178,7 @@ type EtcdProcessClusterConfig struct { CorruptCheckTime time.Duration CompactHashCheckEnabled bool CompactHashCheckTime time.Duration + Restart bool } // NewEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -362,7 +363,6 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* lg: lg, ExecPath: cfg.ExecPath, Args: args, - EnvVars: cfg.EnvVars, TlsArgs: cfg.TlsArgs(), DataDirPath: dataDirPath, KeepDataDir: cfg.KeepDataDir, @@ -371,6 +371,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* Acurl: curl, Murl: murl, InitialToken: cfg.InitialToken, + Restart: cfg.Restart, } } diff --git a/tests/framework/e2e/cluster_proxy.go b/tests/framework/e2e/cluster_proxy.go index 36042f287a13..1716198c632b 100644 --- a/tests/framework/e2e/cluster_proxy.go +++ b/tests/framework/e2e/cluster_proxy.go @@ -121,7 +121,7 @@ func (pp *proxyProc) start() error { if pp.proc != nil { panic("already started") } - proc, err := SpawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), nil, pp.name) + proc, err := SpawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), nil, pp.name, false) if err != nil { return err } diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index f4c85990714b..2976224aec77 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -19,13 +19,13 @@ import ( "fmt" "net/url" "os" + "syscall" "testing" "time" - "go.uber.org/zap" - "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/expect" + "go.uber.org/zap" ) var ( @@ -47,6 +47,7 @@ type EtcdProcess interface { Close() error Config() *EtcdServerProcessConfig Logs() LogsExpect + Kill() error } type LogsExpect interface { @@ -80,6 +81,7 @@ type EtcdServerProcessConfig struct { InitialToken string InitialCluster string + Restart bool } func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { @@ -104,7 +106,7 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error { panic("already started") } ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name)) - proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name) + proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name, ep.cfg.Restart) if err != nil { return err } @@ -168,6 +170,9 @@ func (ep *EtcdServerProcess) waitReady(ctx context.Context) error { } func (ep *EtcdServerProcess) Config() *EtcdServerProcessConfig { return ep.cfg } +func (ep *EtcdServerProcess) Kill() error { + return ep.proc.Signal(syscall.SIGKILL) +} func (ep *EtcdServerProcess) Logs() LogsExpect { if ep.proc == nil { diff --git a/tests/framework/e2e/etcd_spawn.go b/tests/framework/e2e/etcd_spawn.go index ab86df150a21..adc2c0183b45 100644 --- a/tests/framework/e2e/etcd_spawn.go +++ b/tests/framework/e2e/etcd_spawn.go @@ -27,5 +27,5 @@ func SpawnCmd(args []string, envVars map[string]string) (*expect.ExpectProcess, } func SpawnNamedCmd(processName string, args []string, envVars map[string]string) (*expect.ExpectProcess, error) { - return SpawnCmdWithLogger(zap.NewNop(), args, envVars, processName) + return SpawnCmdWithLogger(zap.NewNop(), args, envVars, processName, false) } diff --git a/tests/framework/e2e/etcd_spawn_cov.go b/tests/framework/e2e/etcd_spawn_cov.go index 84d680385ef1..7599d1937278 100644 --- a/tests/framework/e2e/etcd_spawn_cov.go +++ b/tests/framework/e2e/etcd_spawn_cov.go @@ -68,7 +68,7 @@ func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string zap.String("working-dir", wd), zap.String("name", name), zap.Strings("environment-variables", env)) - return expect.NewExpectWithEnv(cmd, allArgs, env, name) + return expect.NewExpectWithEnv(cmd, allArgs, env, name, false) } func getCovArgs() ([]string, error) { diff --git a/tests/framework/e2e/etcd_spawn_nocov.go b/tests/framework/e2e/etcd_spawn_nocov.go index d050e8b52218..ee2089679ac0 100644 --- a/tests/framework/e2e/etcd_spawn_nocov.go +++ b/tests/framework/e2e/etcd_spawn_nocov.go @@ -28,7 +28,7 @@ import ( const noOutputLineCount = 0 // regular binaries emit no extra lines -func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string, name string) (*expect.ExpectProcess, error) { +func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string, name string, restart bool) (*expect.ExpectProcess, error) { wd, err := os.Getwd() if err != nil { return nil, err @@ -40,13 +40,17 @@ func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string zap.Strings("args", args), zap.String("working-dir", wd), zap.String("name", name), - zap.Strings("environment-variables", env)) - return expect.NewExpectWithEnv(CtlBinPath, args[1:], env, name) + zap.Strings("environment-variables", env), + zap.Bool("restart", restart), + ) + return expect.NewExpectWithEnv(CtlBinPath, args[1:], env, name, restart) } lg.Info("spawning process", zap.Strings("args", args), zap.String("working-dir", wd), zap.String("name", name), - zap.Strings("environment-variables", env)) - return expect.NewExpectWithEnv(args[0], args[1:], env, name) + zap.Strings("environment-variables", env), + zap.Bool("restart", restart), + ) + return expect.NewExpectWithEnv(args[0], args[1:], env, name, restart) } diff --git a/tests/framework/interface.go b/tests/framework/interface.go index 696879be9f29..3f49b3fd7648 100644 --- a/tests/framework/interface.go +++ b/tests/framework/interface.go @@ -33,6 +33,7 @@ type Cluster interface { Client(cfg clientv3.AuthConfig) (Client, error) WaitLeader(t testing.TB) int Close() error + Endpoints() []string } type Member interface { diff --git a/tests/go.mod b/tests/go.mod index d806421d986d..ce2f0942b207 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -15,6 +15,7 @@ replace ( ) require ( + github.com/anishathalye/porcupine v0.1.2 github.com/coreos/go-semver v0.3.0 github.com/dustin/go-humanize v1.0.0 github.com/gogo/protobuf v1.3.2 diff --git a/tests/go.sum b/tests/go.sum index 7c4432c9c848..1d758e57df66 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -43,6 +43,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/anishathalye/porcupine v0.1.2 h1:eqWNeLcnTzXt6usipDJ4RFn6XOWqY5wEqBYVG3yFLSE= +github.com/anishathalye/porcupine v0.1.2/go.mod h1:/X9OQYnVb7DzfKCQVO4tI1Aq+o56UJW+RvN/5U4EuZA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= diff --git a/tests/linearizability/main_test.go b/tests/linearizability/main_test.go new file mode 100644 index 000000000000..61f84b5322cc --- /dev/null +++ b/tests/linearizability/main_test.go @@ -0,0 +1,27 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "testing" + + "go.etcd.io/etcd/tests/v3/framework" +) + +var testRunner = framework.E2eTestRunner + +func TestMain(m *testing.M) { + testRunner.TestMain(m) +} diff --git a/tests/linearizability/model.go b/tests/linearizability/model.go new file mode 100644 index 000000000000..4d817d5527b6 --- /dev/null +++ b/tests/linearizability/model.go @@ -0,0 +1,112 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "encoding/json" + "fmt" + + "github.com/anishathalye/porcupine" +) + +type Operation int8 + +const Read Operation = 0 +const Put Operation = 1 + +type etcdRequest struct { + op Operation + writeData string +} + +type etcdResponse struct { + readData string + err error +} + +type EtcdState struct { + Value string + FailedWrites []string +} + +var etcdModel = porcupine.Model{ + Init: func() interface{} { return "{}" }, + Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) { + stateString := st.(string) + var state EtcdState + err := json.Unmarshal([]byte(stateString), &state) + if err != nil { + panic(err) + } + request := in.(etcdRequest) + response := out.(etcdResponse) + ok, state := step(state, request, response) + data, err := json.Marshal(state) + if err != nil { + panic(err) + } + return ok, string(data) + }, + DescribeOperation: func(in, out interface{}) string { + request := in.(etcdRequest) + response := out.(etcdResponse) + var call, args, resp string + switch request.op { + case Read: + call = "read" + if response.err != nil { + resp = response.err.Error() + } else { + resp = response.readData + } + case Put: + call = "write" + args = request.writeData + if response.err != nil { + resp = response.err.Error() + } else { + resp = "ok" + } + default: + return "" + } + return fmt.Sprintf("%s(%q) -> %s", call, args, resp) + }, +} + +func step(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) { + var ok bool + switch request.op { + case Read: + ok = state.Value == response.readData + if !ok { + for i, write := range state.FailedWrites { + if write == response.readData { + ok = true + state = EtcdState{Value: write, FailedWrites: append(state.FailedWrites[:i], state.FailedWrites[i+1:]...)} + break + } + } + } + case Put: + if response.err == nil { + state.Value = request.writeData + } else { + state.FailedWrites = append(state.FailedWrites, request.writeData) + } + ok = true + } + return ok, state +} diff --git a/tests/linearizability/putget_test.go b/tests/linearizability/putget_test.go new file mode 100644 index 000000000000..35950347559f --- /dev/null +++ b/tests/linearizability/putget_test.go @@ -0,0 +1,193 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "context" + "fmt" + "net/http" + "sync" + "testing" + "time" + + "github.com/anishathalye/porcupine" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/framework/testutils" + "go.uber.org/zap" +) + +const maxOperationsPerClient = 1000000 + +var httpClient = http.Client{ + Timeout: 10 * time.Millisecond, +} +var waitBetweenTriggers = time.Second + +func TestPutGetLinearizability(t *testing.T) { + testRunner.BeforeTest(t) + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second) + defer cancel() + clus, err := e2e.NewEtcdProcessCluster(ctx, t, &e2e.EtcdProcessClusterConfig{ + InitialToken: "new", + ClusterSize: 1, + Restart: true, + }) + if err != nil { + t.Fatal(err) + } + defer clus.Close() + + // Increasing number of failpoints number of tries making test more accurate but requiring more time. + failpointsCount := 60 + minimalQPS := 100.0 + allFailpointsInjected := make(chan struct{}) + go triggerFailpoints(ctx, t, clus, failpointsCount, allFailpointsInjected) + start := time.Now() + operations := simulateTraffic(ctx, t, clus, 1, 8, allFailpointsInjected) + end := time.Now() + + t.Logf("Recorded %d operations", len(operations)) + qps := float64(len(operations)) / float64(end.Sub(start)) * float64(time.Second) + t.Logf("Average traffic: %f qps", qps) + if qps < minimalQPS { + t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", minimalQPS, qps) + } + linearizable, info := porcupine.CheckOperationsVerbose(etcdModel, operations, 0) + if linearizable != porcupine.Ok { + t.Error("Model is not linearizable, saving visualization to /tmp/results.html") + err := porcupine.VisualizePath(etcdModel, info, "/tmp/results.html") + if err != nil { + t.Errorf("Failed to visualize, err: %v", err) + } + } +} + +func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, failpointsCount int, finished chan<- struct{}) { + triggers := 0 + + time.Sleep(waitBetweenTriggers) + ctx, cancel := context.WithTimeout(ctx, 2*time.Duration(failpointsCount)*waitBetweenTriggers) + defer cancel() + testutils.ExecuteUntil(ctx, t, func() { + var err error + for { + err = clus.Procs[0].Kill() + if err != nil { + t.Log(err) + continue + } + triggers++ + if triggers >= failpointsCount { + break + } + time.Sleep(waitBetweenTriggers) + } + }) + time.Sleep(waitBetweenTriggers) + close(finished) +} + +func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, memberCount int, clientCount int, finished <-chan struct{}) (operations []porcupine.Operation) { + startTime := time.Now() + mux := sync.Mutex{} + endpoints := clus.EndpointsV3() + if len(endpoints) != memberCount { + t.Fatalf("Unexpected number of endpoints, got %d, expected %d", len(endpoints), memberCount) + } + + testutils.ExecuteUntil(ctx, t, func() { + wg := sync.WaitGroup{} + for i := 0; i < clientCount; i++ { + cc, err := clientv3.New(clientv3.Config{ + Endpoints: []string{endpoints[i%memberCount]}, + Logger: zap.NewNop(), + DialTimeout: 5 * time.Millisecond, + DialKeepAliveTime: 1 * time.Millisecond, + DialKeepAliveTimeout: 5 * time.Millisecond, + }) + if err != nil { + t.Fatal(err) + } + defer cc.Close() + wg.Add(1) + go func(clientId int) { + defer wg.Done() + op, err := putGet(cc, clientId, startTime, finished) + if err != nil { + t.Error(err) + return + } + mux.Lock() + operations = append(operations, op...) + mux.Unlock() + }(i) + } + + wg.Wait() + }) + return operations +} + +func putGet(cc *clientv3.Client, clientId int, startTime time.Time, finished <-chan struct{}) (operations []porcupine.Operation, err error) { + id := maxOperationsPerClient * clientId + ctx := context.Background() + key := "key" + + for i := 0; i < maxOperationsPerClient; { + select { + case <-finished: + return operations, nil + default: + } + getStartTime := time.Now() + getCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + resp, err := cc.Get(getCtx, key) + cancel() + getResponseTime := time.Now() + if err != nil { + continue + } + var readData string + if len(resp.Kvs) == 1 { + readData = string(resp.Kvs[0].Value) + } + operations = append(operations, porcupine.Operation{ + ClientId: clientId, + Input: etcdRequest{op: Read}, + Call: getStartTime.Sub(startTime).Nanoseconds(), + Output: etcdResponse{readData: readData}, + Return: getResponseTime.Sub(startTime).Nanoseconds(), + }) + putData := fmt.Sprintf("%d", id+i) + putStartTime := time.Now() + putCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + _, err = cc.Put(putCtx, key, putData) + cancel() + putResponseTime := time.Now() + operations = append(operations, porcupine.Operation{ + ClientId: clientId, + Input: etcdRequest{op: Put, writeData: putData}, + Call: putStartTime.Sub(startTime).Nanoseconds(), + Output: etcdResponse{err: err}, + Return: putResponseTime.Sub(startTime).Nanoseconds(), + }) + if err != nil { + continue + } + i++ + } + return operations, nil +}