From d934e2a7bd39b872651dd84d2446c47e1e3e4b2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 13 Aug 2021 12:13:44 +0200 Subject: [PATCH 1/3] Memberlist now forwards only changes, not full original received message. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/kv/memberlist/memberlist_client.go | 16 +--- .../kv/memberlist/memberlist_client_test.go | 94 +++++++++++++++++++ 2 files changed, 96 insertions(+), 14 deletions(-) diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 79432d4668..0121509887 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -980,20 +980,8 @@ func (m *KV) NotifyMsg(msg []byte) { } else if version > 0 { m.notifyWatchers(kvPair.Key) - m.addSentMessage(message{ - Time: time.Now(), - Size: len(msg), - Pair: kvPair, - Version: version, - Changes: changes, - }) - - // Forward this message - // Memberlist will modify message once this function returns, so we need to make a copy - msgCopy := append([]byte(nil), msg...) - - // forward this message further - m.queueBroadcast(kvPair.Key, mod.MergeContent(), version, msgCopy) + // Don't resend original message, but only changes. + m.broadcastNewValue(kvPair.Key, mod, version, codec) } } diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index 2782c6f931..4bc6565f44 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -1074,3 +1074,97 @@ func TestMessageBuffer(t *testing.T) { assert.Len(t, buf, 2) assert.Equal(t, size, 75) } + +func TestNotifyMsgResendsOnlyChanges(t *testing.T) { + codec := dataCodec{} + + cfg := KVConfig{} + // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. + cfg.RetransmitMult = 1 + cfg.Codecs = append(cfg.Codecs, codec) + + kv := NewKV(cfg, log.NewNopLogger()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) + defer services.StopAndAwaitTerminated(context.Background(), kv) + + client, err := NewClient(kv, codec) + require.NoError(t, err) + + // No broadcast messages from KV at the beginning. + require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) + + now := time.Now() + + require.NoError(t, client.CAS(context.Background(), key, func(in interface{}) (out interface{}, retry bool, err error) { + d := getOrCreateData(in) + d.Members["a"] = member{Timestamp: now.Unix(), State: JOINING} + d.Members["b"] = member{Timestamp: now.Unix(), State: JOINING} + return d, true, nil + })) + + // Check that new instance is broadcasted about just once. + assert.Equal(t, 1, len(kv.GetBroadcasts(0, math.MaxInt32))) + require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) + + kv.NotifyMsg(marshalKeyValuePair(t, key, codec, &data{ + Members: map[string]member{ + "a": {Timestamp: now.Unix() - 5, State: ACTIVE}, + "b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}}, + "c": {Timestamp: now.Unix(), State: ACTIVE}, + }})) + + // Check two things here: + // 1) state of value in KV store + // 2) broadcast message only has changed members + + d := getData(t, client, key) + assert.Equal(t, &data{ + Members: map[string]member{ + "a": {Timestamp: now.Unix(), State: JOINING, Tokens: []uint32{}}, // unchanged, timestamp too old + "b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}}, + "c": {Timestamp: now.Unix(), State: ACTIVE, Tokens: []uint32{}}, + }}, d) + + bs := kv.GetBroadcasts(0, math.MaxInt32) + require.Equal(t, 1, len(bs)) + + d = decodeDataFromMarshalledKeyValuePair(t, bs[0], key, codec) + assert.Equal(t, &data{ + Members: map[string]member{ + // "a" is not here, because it wasn't changed by the message. + "b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}}, + "c": {Timestamp: now.Unix(), State: ACTIVE}, + }}, d) +} + +func decodeDataFromMarshalledKeyValuePair(t *testing.T, marshalledKVP []byte, key string, codec dataCodec) *data { + kvp := KeyValuePair{} + require.NoError(t, kvp.Unmarshal(marshalledKVP)) + require.Equal(t, key, kvp.Key) + + val, err := codec.Decode(kvp.Value) + require.NoError(t, err) + d, ok := val.(*data) + require.True(t, ok) + return d +} + +func marshalKeyValuePair(t *testing.T, key string, codec codec.Codec, value interface{}) []byte { + data, err := codec.Encode(value) + require.NoError(t, err) + + kvp := KeyValuePair{Key: key, Codec: codec.CodecID(), Value: data} + data, err = kvp.Marshal() + require.NoError(t, err) + return data +} + +func getOrCreateData(in interface{}) *data { + // Modify value that was passed as a parameter. + // Client takes care of concurrent modifications. + r, ok := in.(*data) + if !ok || r == nil { + return &data{Members: map[string]member{}} + } + return r +} From 08915189bc648222b8cde4f957595e9a1ae23626 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 13 Aug 2021 12:18:10 +0200 Subject: [PATCH 2/3] CHANGELOG.md entry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1391845101..782719e4ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * Users only have control of the HTTP header when Cortex is not frontend by an auth proxy validating the tenant IDs * [CHANGE] Some files and directories created by Mimir components on local disk now have stricter permissions, and are only readable by owner, but not group or others. #4394 * [CHANGE] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328 +* [CHANGE] Memberlist: forward only changes, not entire original message. #4419 * [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262 * [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341 * [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342 From afc21f91f5eb3dd33a7beb4e74a71ce6cf67c5aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 13 Aug 2021 12:22:48 +0200 Subject: [PATCH 3/3] Ignore linter here. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ring/kv/memberlist/memberlist_client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index 4bc6565f44..150db4896d 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -1085,7 +1085,7 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) { kv := NewKV(cfg, log.NewNopLogger()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) - defer services.StopAndAwaitTerminated(context.Background(), kv) + defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck client, err := NewClient(kv, codec) require.NoError(t, err)