From 40b623b10f01ad10f22dc59968335b954e65b3e5 Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Fri, 9 Jul 2021 14:22:28 +0200 Subject: [PATCH] Expose configuration of memberlist packet compression. (#4346) * Expose configuration of memberlist packet compression. Allows manually specifying whether memberlist should compress packets via a new configuration flag: `-memberlist.enable-compression`. This typically has little benefit for Cortex, as the ring state messages are already compressed with Snappy, the second layer of compression does not achieve any additional saving. It's not clear cut whether there might still be some benefit for internal memberlist messages; this needs to be evaluated in a environment of some reasonable scale. Signed-off-by: Steve Simpson * Review comments. Signed-off-by: Steve Simpson * Review comments. Signed-off-by: Steve Simpson --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 5 ++++ ...tegration_memberlist_single_binary_test.go | 29 ++++++++++++------- pkg/ring/kv/memberlist/memberlist_client.go | 3 ++ 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b56acd02d2..ee2ae2906a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ * `-compactor.ring.heartbeat-period` * `-store-gateway.sharding-ring.heartbeat-period` * [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345 +* [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346 * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index b8892152ea..77d98634fe 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3796,6 +3796,11 @@ The `memberlist_config` configures the Gossip memberlist. # CLI flag: -memberlist.dead-node-reclaim-time [dead_node_reclaim_time: | default = 0s] +# Enable message compression. This can be used to reduce bandwidth usage at the +# cost of slightly more CPU utilization. +# CLI flag: -memberlist.compression-enabled +[compression_enabled: | default = true] + # Other cluster members to join. Can be specified multiple times. It can be an # IP, hostname or an entry specified in the DNS Service Discovery format (see # https://cortexmetrics.io/docs/configuration/arguments/#dns-service-discovery diff --git a/integration/integration_memberlist_single_binary_test.go b/integration/integration_memberlist_single_binary_test.go index 53c1a945fd..f4cd7b7914 100644 --- a/integration/integration_memberlist_single_binary_test.go +++ b/integration/integration_memberlist_single_binary_test.go @@ -23,15 +23,21 @@ import ( func TestSingleBinaryWithMemberlist(t *testing.T) { t.Run("default", func(t *testing.T) { - testSingleBinaryEnv(t, false) + testSingleBinaryEnv(t, false, nil) }) t.Run("tls", func(t *testing.T) { - testSingleBinaryEnv(t, true) + testSingleBinaryEnv(t, true, nil) + }) + + t.Run("compression-disabled", func(t *testing.T) { + testSingleBinaryEnv(t, false, map[string]string{ + "-memberlist.compression-enabled": "false", + }) }) } -func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) { +func testSingleBinaryEnv(t *testing.T, tlsEnabled bool, flags map[string]string) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) defer s.Close() @@ -65,13 +71,13 @@ func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) { filepath.Join(s.SharedDir(), clientKeyFile), )) - cortex1 = newSingleBinary("cortex-1", memberlistDNS, "") - cortex2 = newSingleBinary("cortex-2", memberlistDNS, networkName+"-cortex-1:8000") - cortex3 = newSingleBinary("cortex-3", memberlistDNS, networkName+"-cortex-1:8000") + cortex1 = newSingleBinary("cortex-1", memberlistDNS, "", flags) + cortex2 = newSingleBinary("cortex-2", memberlistDNS, networkName+"-cortex-1:8000", flags) + cortex3 = newSingleBinary("cortex-3", memberlistDNS, networkName+"-cortex-1:8000", flags) } else { - cortex1 = newSingleBinary("cortex-1", "", "") - cortex2 = newSingleBinary("cortex-2", "", networkName+"-cortex-1:8000") - cortex3 = newSingleBinary("cortex-3", "", networkName+"-cortex-1:8000") + cortex1 = newSingleBinary("cortex-1", "", "", flags) + cortex2 = newSingleBinary("cortex-2", "", networkName+"-cortex-1:8000", flags) + cortex3 = newSingleBinary("cortex-3", "", networkName+"-cortex-1:8000", flags) } // start cortex-1 first, as cortex-2 and cortex-3 both connect to cortex-1 @@ -109,7 +115,7 @@ func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) { require.NoError(t, s.Stop(cortex3)) } -func newSingleBinary(name string, servername string, join string) *e2ecortex.CortexService { +func newSingleBinary(name string, servername string, join string, testFlags map[string]string) *e2ecortex.CortexService { flags := map[string]string{ "-ingester.final-sleep": "0s", "-ingester.join-after": "0s", // join quickly @@ -132,6 +138,7 @@ func newSingleBinary(name string, servername string, join string) *e2ecortex.Cor mergeFlags( ChunksStorageFlags(), flags, + testFlags, getTLSFlagsWithPrefix("memberlist", servername, servername == ""), ), "", @@ -170,7 +177,7 @@ func TestSingleBinaryWithMemberlistScaling(t *testing.T) { if i > 0 { join = e2e.NetworkContainerHostPort(networkName, "cortex-1", 8000) } - c := newSingleBinary(name, "", join) + c := newSingleBinary(name, "", join, nil) require.NoError(t, s.StartAndWaitReady(c)) instances = append(instances, c) } diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 7e45868bd9..79432d4668 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -136,6 +136,7 @@ type KVConfig struct { GossipNodes int `yaml:"gossip_nodes"` GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time"` DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time"` + EnableCompression bool `yaml:"compression_enabled"` // List of members to join JoinMembers flagext.StringSlice `yaml:"join_members"` @@ -187,6 +188,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.GossipToTheDeadTime, prefix+"memberlist.gossip-to-dead-nodes-time", mlDefaults.GossipToTheDeadTime, "How long to keep gossiping to dead nodes, to give them chance to refute their death.") f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", mlDefaults.DeadNodeReclaimTime, "How soon can dead node's name be reclaimed with new address. 0 to disable.") f.IntVar(&cfg.MessageHistoryBufferBytes, prefix+"memberlist.message-history-buffer-bytes", 0, "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.") + f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.") cfg.TCPTransport.RegisterFlags(f, prefix) } @@ -380,6 +382,7 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { mlCfg.GossipNodes = m.cfg.GossipNodes mlCfg.GossipToTheDeadTime = m.cfg.GossipToTheDeadTime mlCfg.DeadNodeReclaimTime = m.cfg.DeadNodeReclaimTime + mlCfg.EnableCompression = m.cfg.EnableCompression if m.cfg.NodeName != "" { mlCfg.Name = m.cfg.NodeName