Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] etcd server shouldn't wait for the ready notification infinitely on startup #14064

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type ServerConfig struct {
TickMs uint
ElectionTicks int

// WaitClusterReadyTimeout is the maximum time to wait for the
// cluster to be ready on startup before serving client requests.
WaitClusterReadyTimeout time.Duration

// InitialElectionTickAdvance is true, then local member fast-forwards
// election ticks to speed up "initial" leader election trigger. This
// benefits the case of larger election ticks. For instance, cross
Expand Down
43 changes: 23 additions & 20 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,17 @@ const (
ClusterStateFlagNew = "new"
ClusterStateFlagExisting = "existing"

DefaultName = "default"
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultMaxTxnOps = uint(128)
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second
DefaultDowngradeCheckTime = 5 * time.Second
DefaultName = "default"
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultMaxTxnOps = uint(128)
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second
DefaultDowngradeCheckTime = 5 * time.Second
DefaultWaitClusterReadyTimeout = 5 * time.Second

DefaultListenPeerURLs = "http://localhost:2380"
DefaultListenClientURLs = "http://localhost:2379"
Expand Down Expand Up @@ -214,14 +215,15 @@ type Config struct {
// Note that cipher suites are prioritized in the given order.
CipherSuites []string `json:"cipher-suites"`

ClusterState string `json:"initial-cluster-state"`
DNSCluster string `json:"discovery-srv"`
DNSClusterServiceName string `json:"discovery-srv-name"`
Dproxy string `json:"discovery-proxy"`
Durl string `json:"discovery"`
InitialCluster string `json:"initial-cluster"`
InitialClusterToken string `json:"initial-cluster-token"`
StrictReconfigCheck bool `json:"strict-reconfig-check"`
ClusterState string `json:"initial-cluster-state"`
DNSCluster string `json:"discovery-srv"`
DNSClusterServiceName string `json:"discovery-srv-name"`
Dproxy string `json:"discovery-proxy"`
Durl string `json:"discovery"`
InitialCluster string `json:"initial-cluster"`
InitialClusterToken string `json:"initial-cluster-token"`
StrictReconfigCheck bool `json:"strict-reconfig-check"`
ExperimentalWaitClusterReadyTimeout time.Duration `json:"wait-cluster-ready-timeout"`

// EnableV2 exposes the deprecated V2 API surface.
// TODO: Delete in 3.6 (https://github.com/etcd-io/etcd/issues/12913)
Expand Down Expand Up @@ -465,8 +467,9 @@ func NewConfig() *Config {
APUrls: []url.URL{*apurl},
ACUrls: []url.URL{*acurl},

ClusterState: ClusterStateFlagNew,
InitialClusterToken: "etcd-cluster",
ClusterState: ClusterStateFlagNew,
InitialClusterToken: "etcd-cluster",
ExperimentalWaitClusterReadyTimeout: DefaultWaitClusterReadyTimeout,

StrictReconfigCheck: DefaultStrictReconfigCheck,
Metrics: "basic",
Expand Down
2 changes: 2 additions & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
WaitClusterReadyTimeout: cfg.ExperimentalWaitClusterReadyTimeout,
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
Expand Down Expand Up @@ -323,6 +324,7 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
zap.Bool("force-new-cluster", sc.ForceNewCluster),
zap.String("heartbeat-interval", fmt.Sprintf("%v", time.Duration(sc.TickMs)*time.Millisecond)),
zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(sc.ElectionTicks*int(sc.TickMs))*time.Millisecond)),
zap.String("wait-cluster-ready-timeout", sc.WaitClusterReadyTimeout.String()),
zap.Bool("initial-election-tick-advance", sc.InitialElectionTickAdvance),
zap.Uint64("snapshot-count", sc.SnapshotCount),
zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries),
Expand Down
11 changes: 10 additions & 1 deletion server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net"
"net/http"
"strings"
"time"

etcdservergw "go.etcd.io/etcd/api/v3/etcdserverpb/gw"
"go.etcd.io/etcd/client/pkg/v3/transport"
Expand Down Expand Up @@ -93,7 +94,15 @@ func (sctx *serveCtx) serve(
errHandler func(error),
gopts ...grpc.ServerOption) (err error) {
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
<-s.ReadyNotify()

// When the quorum isn't satisfied, then etcd server will be blocked
// on <-s.ReadyNotify(). Set a timeout here so that the etcd server
// can continue to serve serializable read request.
select {
case <-time.After(s.Cfg.WaitClusterReadyTimeout):
sctx.lg.Warn("timed out waiting for the ready notification")
case <-s.ReadyNotify():
}

sctx.lg.Info("ready to serve client requests")

Expand Down
1 change: 1 addition & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.")
fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.")
fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.")
fs.DurationVar(&cfg.ec.ExperimentalWaitClusterReadyTimeout, "experimental-wait-cluster-ready-timeout", cfg.ec.ExperimentalWaitClusterReadyTimeout, "Maximum duration to wait for the cluster to be ready.")

// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ Experimental feature:
Enable the write transaction to use a shared buffer in its readonly check operations.
--experimental-bootstrap-defrag-threshold-megabytes
Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.
--experimental-wait-cluster-ready-timeout '5s'
Set the maximum time duration to wait for the cluster to be ready.

Unsafe feature:
--force-new-cluster 'false'
Expand Down
29 changes: 20 additions & 9 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,26 @@ type etcdProcessClusterConfig struct {
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new etcdProcessCluster once all nodes are ready to accept client requests.
func newEtcdProcessCluster(t testing.TB, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
epc, err := initEtcdProcessCluster(t, cfg)
if err != nil {
return nil, err
}

if cfg.rollingStart {
if err := epc.RollingStart(); err != nil {
return nil, fmt.Errorf("Cannot rolling-start: %v", err)
}
} else {
if err := epc.Start(); err != nil {
return nil, fmt.Errorf("Cannot start: %v", err)
}
}
return epc, nil
}

// initEtcdProcessCluster initializes a new cluster based on the given config.
// It doesn't start the cluster.
func initEtcdProcessCluster(t testing.TB, cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
skipInShortMode(t)

etcdCfgs := cfg.etcdServerProcessConfigs(t)
Expand All @@ -200,15 +220,6 @@ func newEtcdProcessCluster(t testing.TB, cfg *etcdProcessClusterConfig) (*etcdPr
epc.procs[i] = proc
}

if cfg.rollingStart {
if err := epc.RollingStart(); err != nil {
return nil, fmt.Errorf("Cannot rolling-start: %v", err)
}
} else {
if err := epc.Start(); err != nil {
return nil, fmt.Errorf("Cannot start: %v", err)
}
}
return epc, nil
}

Expand Down
79 changes: 79 additions & 0 deletions tests/e2e/ctl_v3_kv_no_quorum_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// When the quorum isn't satisfied, then each etcd member isn't able to
// publish/register server information(i.e., clientURL) into the cluster.
// Accordingly, the v2 proxy can't get any member's clientURL, so this
// case will fail for sure in this case.
//
//go:build !cluster_proxy
// +build !cluster_proxy

package e2e

import (
"testing"
)

func TestSerializableReadWithoutQuorum(t *testing.T) {
tcs := []struct {
name string
testFunc func(cx ctlCtx)
}{
{
name: "serializableReadTest",
testFunc: serializableReadTest,
},
{
name: "linearizableReadTest",
testFunc: linearizableReadTest,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
// Initialize a cluster with 3 members
epc, err := initEtcdProcessCluster(t, newConfigAutoTLS())
if err != nil {
t.Fatalf("Failed to initilize the etcd cluster: %v", err)
}

// Remove two members, so that only one etcd will get started
epc.procs = epc.procs[:1]

// Start the etcd cluster with only one member
if err := epc.Start(); err != nil {
t.Fatalf("Failed to start the etcd cluster: %v", err)
}

// construct the ctl context
cx := getDefaultCtlCtx(t)
cx.epc = epc
runCtlTest(t, tc.testFunc, nil, cx)
})
}
}

func serializableReadTest(cx ctlCtx) {
cx.quorum = false
if err := ctlV3Get(cx, []string{"key1"}, []kv{}...); err != nil {
cx.t.Errorf("serializableReadTest failed: %v", err)
}
}

func linearizableReadTest(cx ctlCtx) {
cx.quorum = true
if err := ctlV3GetWithErr(cx, []string{"key"}, []string{"retrying of unary invoker failed"}); err != nil { // expect errors
cx.t.Fatalf("ctlV3GetWithErr error (%v)", err)
}
}
38 changes: 23 additions & 15 deletions tests/e2e/ctl_v3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,18 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
testCtlWithOffline(t, testFunc, nil, opts...)
}

func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx), opts ...ctlOption) {
BeforeTest(t)

ret := ctlCtx{
func getDefaultCtlCtx(t *testing.T) ctlCtx {
return ctlCtx{
t: t,
cfg: *newConfigAutoTLS(),
dialTimeout: 7 * time.Second,
}
}

func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx), opts ...ctlOption) {
BeforeTest(t)

ret := getDefaultCtlCtx(t)
ret.applyOpts(opts)

if !ret.quorum {
Expand All @@ -243,15 +247,19 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun
ret.epc = epc
ret.dataDir = epc.procs[0].Config().dataDirPath

runCtlTest(t, testFunc, testOfflineFunc, ret)
}

func runCtlTest(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx), cx ctlCtx) {
defer func() {
if ret.envMap != nil {
for k := range ret.envMap {
if cx.envMap != nil {
for k := range cx.envMap {
os.Unsetenv(k)
}
ret.envMap = make(map[string]string)
cx.envMap = make(map[string]string)
}
if ret.epc != nil {
if errC := ret.epc.Close(); errC != nil {
if cx.epc != nil {
if errC := cx.epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
}
Expand All @@ -260,12 +268,12 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun
donec := make(chan struct{})
go func() {
defer close(donec)
testFunc(ret)
testFunc(cx)
t.Log("---testFunc logic DONE")
}()

timeout := 2*ret.dialTimeout + time.Second
if ret.dialTimeout == 0 {
timeout := 2*cx.dialTimeout + time.Second
if cx.dialTimeout == 0 {
timeout = 30 * time.Second
}
select {
Expand All @@ -275,12 +283,12 @@ func testCtlWithOffline(t *testing.T, testFunc func(ctlCtx), testOfflineFunc fun
}

t.Log("closing test cluster...")
assert.NoError(t, epc.Close())
epc = nil
assert.NoError(t, cx.epc.Close())
cx.epc = nil
t.Log("closed test cluster...")

if testOfflineFunc != nil {
testOfflineFunc(ret)
testOfflineFunc(cx)
}
}

Expand Down