From abcb297adabf66d9d75750988dbea3e661081c3f Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Wed, 7 Jul 2021 11:10:35 +0200 Subject: [PATCH] Expose default configuration values for memberlist. (#4276) * Expose default configuration values for memberlist. Set the defaults for various memberlist configuration values based on the "Default LAN" configuration. The only result of this change is that the defaults are now visible and are in the documentation. This also means that if the default values change, then the changes are visible in the documentation, where as before they would have gone unnoticed. To prevent this being a breaking change, the existing behaviour is retained, in case anyone is explicitly setting the values to zero and expecting the default to be used. Signed-off-by: Steve Simpson * Remove use of zero value as default value indicator. Signed-off-by: Steve Simpson * Review comments. Signed-off-by: Steve Simpson * Review comments. Signed-off-by: Steve Simpson Signed-off-by: Alvin Lin --- CHANGELOG.md | 8 + docs/configuration/config-file-reference.md | 25 ++- pkg/cortex/cortex.go | 2 +- .../kv/memberlist/kv_init_service_test.go | 3 + pkg/ring/kv/memberlist/memberlist_client.go | 57 ++++--- .../kv/memberlist/memberlist_client_test.go | 146 +++++++++--------- 6 files changed, 122 insertions(+), 119 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51d62a6efd..732b329900 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,14 @@ * [CHANGE] Change default value of `-server.grpc.keepalive.min-time-between-pings` from `5m` to `10s` and `-server.grpc.keepalive.ping-without-stream-allowed` to `true`. #4168 * [CHANGE] Ingester: Change default value of `-ingester.active-series-metrics-enabled` to `true`. This incurs a small increase in memory usage, between 1.2% and 1.6% as measured on ingesters with 1.3M active series. #4257 * [CHANGE] Dependency: update go-redis from v8.2.3 to v8.9.0. #4236 +* [CHANGE] Memberlist: Expose default configuration values to the command line options. Note that setting these explicitly to zero will no longer cause the default to be used. If the default is desired, then do set the option. The following are affected: #4276 + - `-memberlist.stream-timeout` + - `-memberlist.retransmit-factor` + - `-memberlist.pull-push-interval` + - `-memberlist.gossip-interval` + - `-memberlist.gossip-nodes` + - `-memberlist.gossip-to-dead-nodes-time` + - `-memberlist.dead-node-reclaim-time` * [FEATURE] Querier: Added new `-querier.max-fetched-series-per-query` flag. When Cortex is running with blocks storage, the max series per query limit is enforced in the querier and applies to unique series received from ingesters and store-gateway (long-term storage). #4179 * [FEATURE] Querier/Ruler: Added new `-querier.max-fetched-chunk-bytes-per-query` flag. When Cortex is running with blocks storage, the max chunk bytes limit is enforced in the querier and ruler and limits the size of all aggregated chunks returned from ingesters and storage as bytes for a query. #4216 * [FEATURE] Alertmanager: support negative matchers, time-based muting - [upstream release notes](https://github.com/prometheus/alertmanager/releases/tag/v0.22.0). #4237 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index cd1afd7632..cc6e8dd744 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3761,33 +3761,32 @@ The `memberlist_config` configures the Gossip memberlist. [randomize_node_name: | default = true] # The timeout for establishing a connection with a remote node, and for -# read/write operations. Uses memberlist LAN defaults if 0. +# read/write operations. # CLI flag: -memberlist.stream-timeout -[stream_timeout: | default = 0s] +[stream_timeout: | default = 10s] # Multiplication factor used when sending out messages (factor * log(N+1)). # CLI flag: -memberlist.retransmit-factor -[retransmit_factor: | default = 0] +[retransmit_factor: | default = 4] -# How often to use pull/push sync. Uses memberlist LAN defaults if 0. +# How often to use pull/push sync. # CLI flag: -memberlist.pullpush-interval -[pull_push_interval: | default = 0s] +[pull_push_interval: | default = 30s] -# How often to gossip. Uses memberlist LAN defaults if 0. +# How often to gossip. # CLI flag: -memberlist.gossip-interval -[gossip_interval: | default = 0s] +[gossip_interval: | default = 200ms] -# How many nodes to gossip to. Uses memberlist LAN defaults if 0. +# How many nodes to gossip to. # CLI flag: -memberlist.gossip-nodes -[gossip_nodes: | default = 0] +[gossip_nodes: | default = 3] # How long to keep gossiping to dead nodes, to give them chance to refute their -# death. Uses memberlist LAN defaults if 0. +# death. # CLI flag: -memberlist.gossip-to-dead-nodes-time -[gossip_to_dead_nodes_time: | default = 0s] +[gossip_to_dead_nodes_time: | default = 30s] -# How soon can dead node's name be reclaimed with new address. Defaults to 0, -# which is disabled. +# How soon can dead node's name be reclaimed with new address. 0 to disable. # CLI flag: -memberlist.dead-node-reclaim-time [dead_node_reclaim_time: | default = 0s] diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 3f19861111..76fb18e0f0 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -170,7 +170,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Alertmanager.RegisterFlags(f) c.AlertmanagerStorage.RegisterFlags(f) c.RuntimeConfig.RegisterFlags(f) - c.MemberlistKV.RegisterFlags(f, "") + c.MemberlistKV.RegisterFlags(f) c.QueryScheduler.RegisterFlags(f) // These don't seem to have a home. diff --git a/pkg/ring/kv/memberlist/kv_init_service_test.go b/pkg/ring/kv/memberlist/kv_init_service_test.go index a6e7001f0d..221da9a66f 100644 --- a/pkg/ring/kv/memberlist/kv_init_service_test.go +++ b/pkg/ring/kv/memberlist/kv_init_service_test.go @@ -7,6 +7,8 @@ import ( "github.com/hashicorp/memberlist" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/util/flagext" ) func TestPage(t *testing.T) { @@ -53,6 +55,7 @@ func TestPage(t *testing.T) { func TestStop(t *testing.T) { var cfg KVConfig + flagext.DefaultValues(&cfg) kvinit := NewKVInitService(&cfg, nil) require.NoError(t, kvinit.stopping(nil)) } diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index e2d9ede504..c0f962993b 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -165,12 +165,14 @@ type KVConfig struct { } // RegisterFlags registers flags. -func (cfg *KVConfig) RegisterFlags(f *flag.FlagSet, prefix string) { +func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + mlDefaults := defaultMemberlistConfig() + // "Defaults to hostname" -- memberlist sets it to hostname by default. f.StringVar(&cfg.NodeName, prefix+"memberlist.nodename", "", "Name of the node in memberlist cluster. Defaults to hostname.") // memberlist.DefaultLANConfig will put hostname here. f.BoolVar(&cfg.RandomizeNodeName, prefix+"memberlist.randomize-node-name", true, "Add random suffix to the node name.") - f.DurationVar(&cfg.StreamTimeout, prefix+"memberlist.stream-timeout", 0, "The timeout for establishing a connection with a remote node, and for read/write operations. Uses memberlist LAN defaults if 0.") - f.IntVar(&cfg.RetransmitMult, prefix+"memberlist.retransmit-factor", 0, "Multiplication factor used when sending out messages (factor * log(N+1)).") + f.DurationVar(&cfg.StreamTimeout, prefix+"memberlist.stream-timeout", mlDefaults.TCPTimeout, "The timeout for establishing a connection with a remote node, and for read/write operations.") + f.IntVar(&cfg.RetransmitMult, prefix+"memberlist.retransmit-factor", mlDefaults.RetransmitMult, "Multiplication factor used when sending out messages (factor * log(N+1)).") f.Var(&cfg.JoinMembers, prefix+"memberlist.join", "Other cluster members to join. Can be specified multiple times. It can be an IP, hostname or an entry specified in the DNS Service Discovery format (see https://cortexmetrics.io/docs/configuration/arguments/#dns-service-discovery for more details).") f.DurationVar(&cfg.MinJoinBackoff, prefix+"memberlist.min-join-backoff", 1*time.Second, "Min backoff duration to join other cluster members.") f.DurationVar(&cfg.MaxJoinBackoff, prefix+"memberlist.max-join-backoff", 1*time.Minute, "Max backoff duration to join other cluster members.") @@ -179,16 +181,20 @@ func (cfg *KVConfig) RegisterFlags(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.RejoinInterval, prefix+"memberlist.rejoin-interval", 0, "If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix the cluster split issue, and is harmless otherwise. For example when using only few components as a seed nodes (via -memberlist.join), then it's recommended to use rejoin. If -memberlist.join points to dynamic service that resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin is not needed.") f.DurationVar(&cfg.LeftIngestersTimeout, prefix+"memberlist.left-ingesters-timeout", 5*time.Minute, "How long to keep LEFT ingesters in the ring.") f.DurationVar(&cfg.LeaveTimeout, prefix+"memberlist.leave-timeout", 5*time.Second, "Timeout for leaving memberlist cluster.") - f.DurationVar(&cfg.GossipInterval, prefix+"memberlist.gossip-interval", 0, "How often to gossip. Uses memberlist LAN defaults if 0.") - f.IntVar(&cfg.GossipNodes, prefix+"memberlist.gossip-nodes", 0, "How many nodes to gossip to. Uses memberlist LAN defaults if 0.") - f.DurationVar(&cfg.PushPullInterval, prefix+"memberlist.pullpush-interval", 0, "How often to use pull/push sync. Uses memberlist LAN defaults if 0.") - f.DurationVar(&cfg.GossipToTheDeadTime, prefix+"memberlist.gossip-to-dead-nodes-time", 0, "How long to keep gossiping to dead nodes, to give them chance to refute their death. Uses memberlist LAN defaults if 0.") - f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", 0, "How soon can dead node's name be reclaimed with new address. Defaults to 0, which is disabled.") + f.DurationVar(&cfg.GossipInterval, prefix+"memberlist.gossip-interval", mlDefaults.GossipInterval, "How often to gossip.") + f.IntVar(&cfg.GossipNodes, prefix+"memberlist.gossip-nodes", mlDefaults.GossipNodes, "How many nodes to gossip to.") + f.DurationVar(&cfg.PushPullInterval, prefix+"memberlist.pullpush-interval", mlDefaults.PushPullInterval, "How often to use pull/push sync.") + f.DurationVar(&cfg.GossipToTheDeadTime, prefix+"memberlist.gossip-to-dead-nodes-time", mlDefaults.GossipToTheDeadTime, "How long to keep gossiping to dead nodes, to give them chance to refute their death.") + f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", mlDefaults.DeadNodeReclaimTime, "How soon can dead node's name be reclaimed with new address. 0 to disable.") f.IntVar(&cfg.MessageHistoryBufferBytes, prefix+"memberlist.message-history-buffer-bytes", 0, "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.") cfg.TCPTransport.RegisterFlags(f, prefix) } +func (cfg *KVConfig) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix(f, "") +} + func generateRandomSuffix() string { suffix := make([]byte, 4) _, err := rand.Read(suffix) @@ -345,36 +351,27 @@ func NewKV(cfg KVConfig, logger log.Logger) *KV { return mlkv } +func defaultMemberlistConfig() *memberlist.Config { + return memberlist.DefaultLANConfig() +} + func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger) if err != nil { return nil, fmt.Errorf("failed to create transport: %v", err) } - mlCfg := memberlist.DefaultLANConfig() + mlCfg := defaultMemberlistConfig() mlCfg.Delegate = m - if m.cfg.StreamTimeout != 0 { - mlCfg.TCPTimeout = m.cfg.StreamTimeout - } - if m.cfg.RetransmitMult != 0 { - mlCfg.RetransmitMult = m.cfg.RetransmitMult - } - if m.cfg.PushPullInterval != 0 { - mlCfg.PushPullInterval = m.cfg.PushPullInterval - } - if m.cfg.GossipInterval != 0 { - mlCfg.GossipInterval = m.cfg.GossipInterval - } - if m.cfg.GossipNodes != 0 { - mlCfg.GossipNodes = m.cfg.GossipNodes - } - if m.cfg.GossipToTheDeadTime > 0 { - mlCfg.GossipToTheDeadTime = m.cfg.GossipToTheDeadTime - } - if m.cfg.DeadNodeReclaimTime > 0 { - mlCfg.DeadNodeReclaimTime = m.cfg.DeadNodeReclaimTime - } + mlCfg.TCPTimeout = m.cfg.StreamTimeout + mlCfg.RetransmitMult = m.cfg.RetransmitMult + mlCfg.PushPullInterval = m.cfg.PushPullInterval + mlCfg.GossipInterval = m.cfg.GossipInterval + mlCfg.GossipNodes = m.cfg.GossipNodes + mlCfg.GossipToTheDeadTime = m.cfg.GossipToTheDeadTime + mlCfg.DeadNodeReclaimTime = m.cfg.DeadNodeReclaimTime + if m.cfg.NodeName != "" { mlCfg.Name = m.cfg.NodeName } diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index 22e598197f..de337571da 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/ring/kv/codec" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" ) @@ -207,12 +208,12 @@ func TestBasicGetAndCas(t *testing.T) { c := dataCodec{} name := "Ing 1" - cfg := KVConfig{ - TCPTransport: TCPTransportConfig{ - BindAddrs: []string{"localhost"}, - }, - Codecs: []codec.Codec{c}, + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, } + cfg.Codecs = []codec.Codec{c} mkv := NewKV(cfg, log.NewNopLogger()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) @@ -265,10 +266,10 @@ func withFixtures(t *testing.T, testFN func(t *testing.T, kv *Client)) { c := dataCodec{} - cfg := KVConfig{ - TCPTransport: TCPTransportConfig{}, - Codecs: []codec.Codec{c}, - } + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = TCPTransportConfig{} + cfg.Codecs = []codec.Codec{c} mkv := NewKV(cfg, log.NewNopLogger()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) @@ -410,9 +411,9 @@ func TestCASFailedBecauseOfVersionChanges(t *testing.T) { func TestMultipleCAS(t *testing.T) { c := dataCodec{} - cfg := KVConfig{ - Codecs: []codec.Codec{c}, - } + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.Codecs = []codec.Codec{c} mkv := NewKV(cfg, log.NewNopLogger()) mkv.maxCasRetries = 20 @@ -501,26 +502,21 @@ func TestMultipleClients(t *testing.T) { for i := 0; i < members; i++ { id := fmt.Sprintf("Member-%d", i) - cfg := KVConfig{ - NodeName: id, - - // some useful parameters when playing with higher number of members - // RetransmitMult: 2, - GossipInterval: 100 * time.Millisecond, - GossipNodes: 3, - PushPullInterval: 5 * time.Second, - // PacketDialTimeout: 5 * time.Second, - // StreamTimeout: 5 * time.Second, - // PacketWriteTimeout: 2 * time.Second, - - TCPTransport: TCPTransportConfig{ - BindAddrs: []string{"localhost"}, - BindPort: 0, // randomize ports - }, - - Codecs: []codec.Codec{c}, + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.NodeName = id + + cfg.GossipInterval = 100 * time.Millisecond + cfg.GossipNodes = 3 + cfg.PushPullInterval = 5 * time.Second + + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: 0, // randomize ports } + cfg.Codecs = []codec.Codec{c} + mkv := NewKV(cfg, log.NewNopLogger()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) @@ -645,26 +641,26 @@ func TestJoinMembersWithRetryBackoff(t *testing.T) { for i, port := range ports { id := fmt.Sprintf("Member-%d", i) - cfg := KVConfig{ - NodeName: id, + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.NodeName = id - GossipInterval: 100 * time.Millisecond, - GossipNodes: 3, - PushPullInterval: 5 * time.Second, + cfg.GossipInterval = 100 * time.Millisecond + cfg.GossipNodes = 3 + cfg.PushPullInterval = 5 * time.Second - MinJoinBackoff: 100 * time.Millisecond, - MaxJoinBackoff: 1 * time.Minute, - MaxJoinRetries: 10, - AbortIfJoinFails: true, + cfg.MinJoinBackoff = 100 * time.Millisecond + cfg.MaxJoinBackoff = 1 * time.Minute + cfg.MaxJoinRetries = 10 + cfg.AbortIfJoinFails = true - TCPTransport: TCPTransportConfig{ - BindAddrs: []string{"localhost"}, - BindPort: port, - }, - - Codecs: []codec.Codec{c}, + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: port, } + cfg.Codecs = []codec.Codec{c} + if i == 0 { // Add members to first KV config to join immediately on initialization. // This will enforce backoff as each next members listener is not open yet. @@ -732,21 +728,21 @@ func TestMemberlistFailsToJoin(t *testing.T) { ports, err := getFreePorts(1) require.NoError(t, err) - cfg := KVConfig{ - MinJoinBackoff: 100 * time.Millisecond, - MaxJoinBackoff: 100 * time.Millisecond, - MaxJoinRetries: 2, - AbortIfJoinFails: true, + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.MinJoinBackoff = 100 * time.Millisecond + cfg.MaxJoinBackoff = 100 * time.Millisecond + cfg.MaxJoinRetries = 2 + cfg.AbortIfJoinFails = true - TCPTransport: TCPTransportConfig{ - BindAddrs: []string{"localhost"}, - BindPort: 0, - }, + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: 0, + } - JoinMembers: []string{fmt.Sprintf("127.0.0.1:%d", ports[0])}, + cfg.JoinMembers = []string{fmt.Sprintf("127.0.0.1:%d", ports[0])} - Codecs: []codec.Codec{c}, - } + cfg.Codecs = []codec.Codec{c} mkv := NewKV(cfg, log.NewNopLogger()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) @@ -899,16 +895,16 @@ func (d distributedCounterCodec) Encode(val interface{}) ([]byte, error) { var _ codec.Codec = &distributedCounterCodec{} func TestMultipleCodecs(t *testing.T) { - cfg := KVConfig{ - TCPTransport: TCPTransportConfig{ - BindAddrs: []string{"localhost"}, - BindPort: 0, // randomize - }, + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: 0, // randomize + } - Codecs: []codec.Codec{ - dataCodec{}, - distributedCounterCodec{}, - }, + cfg.Codecs = []codec.Codec{ + dataCodec{}, + distributedCounterCodec{}, } mkv1 := NewKV(cfg, log.NewNopLogger()) @@ -990,17 +986,17 @@ func TestRejoin(t *testing.T) { ports, err := getFreePorts(2) require.NoError(t, err) - cfg1 := KVConfig{ - TCPTransport: TCPTransportConfig{ - BindAddrs: []string{"localhost"}, - BindPort: ports[0], - }, - - RandomizeNodeName: true, - Codecs: []codec.Codec{dataCodec{}}, - AbortIfJoinFails: false, + var cfg1 KVConfig + flagext.DefaultValues(&cfg1) + cfg1.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: ports[0], } + cfg1.RandomizeNodeName = true + cfg1.Codecs = []codec.Codec{dataCodec{}} + cfg1.AbortIfJoinFails = false + cfg2 := cfg1 cfg2.TCPTransport.BindPort = ports[1] cfg2.JoinMembers = []string{fmt.Sprintf("localhost:%d", ports[0])}