From 3c42c38df11602dec25ce1f05d0a8e9d371015a7 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Tue, 26 Apr 2022 10:22:25 -0400 Subject: [PATCH 1/2] Update to latest dskit and memberlist fork Fixes #1743 Signed-off-by: Nick Pillitteri --- go.mod | 4 +- go.sum | 8 +- .../grafana/dskit/services/basic_service.go | 2 +- .../grafana/dskit/spanlogger/spanlogger.go | 2 +- .../github.com/hashicorp/memberlist/config.go | 11 ++ .../github.com/hashicorp/memberlist/label.go | 178 ++++++++++++++++++ .../hashicorp/memberlist/memberlist.go | 13 +- vendor/github.com/hashicorp/memberlist/net.go | 139 +++++++++++--- .../hashicorp/memberlist/peeked_conn.go | 48 +++++ .../hashicorp/memberlist/security.go | 19 ++ .../github.com/hashicorp/memberlist/state.go | 25 ++- .../hashicorp/memberlist/transport.go | 47 +++++ .../github.com/hashicorp/memberlist/util.go | 4 +- vendor/modules.txt | 6 +- 14 files changed, 457 insertions(+), 49 deletions(-) create mode 100644 vendor/github.com/hashicorp/memberlist/label.go create mode 100644 vendor/github.com/hashicorp/memberlist/peeked_conn.go diff --git a/go.mod b/go.mod index 76e384a2eb5..02946117380 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.0 - github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca + github.com/grafana/dskit v0.0.0-20220426141700-ec983e9d5345 github.com/grafana/e2e v0.1.0 github.com/hashicorp/golang-lru v0.5.4 github.com/json-iterator/go v1.1.12 @@ -236,6 +236,6 @@ replace github.com/hashicorp/go-hclog => github.com/hashicorp/go-hclog v0.12.2 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: https://github.com/hashicorp/memberlist/pull/260 -replace github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.2.5-0.20211201083710-c7bc8e9df94b +replace github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 replace github.com/vimeo/galaxycache => github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e diff --git a/go.sum b/go.sum index e51ec981674..0a3d27f41ba 100644 --- a/go.sum +++ b/go.sum @@ -1047,12 +1047,12 @@ github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 h1:LQAhgcUPnzdjU github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM= github.com/grafana/dskit v0.0.0-20220112093026-95274ccc858d/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE= -github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca h1:0qHzm6VS0bCsSWKHuyfpt+pdpyScdZbzY/IFIyKSYOk= -github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca/go.mod h1:q51XdMLLHNZJSG6KOGujC20ed2OoLFdx0hBmOEVfRs0= +github.com/grafana/dskit v0.0.0-20220426141700-ec983e9d5345 h1:AAXGMQeVpdl6rvhmhWn+UL9Y5ICLZNgffdfvz2Yo6UE= +github.com/grafana/dskit v0.0.0-20220426141700-ec983e9d5345/go.mod h1:9It/K30QPyj/FuTqBb/SYnaS4/BJCP5YL4SRfXB7dG0= github.com/grafana/e2e v0.1.0 h1:nThd0U0TjUqyOOupSb+qDd4BOdhqwhR/oYbjoqiMlZk= github.com/grafana/e2e v0.1.0/go.mod h1:+26VJWpczg2OU3D0537acnHSHzhJORpxOs6F+M27tZo= -github.com/grafana/memberlist v0.2.5-0.20211201083710-c7bc8e9df94b h1:UlCBLaqvS4wVYNrMKSfqTBVsed/EOY9dnenhYZMUnrA= -github.com/grafana/memberlist v0.2.5-0.20211201083710-c7bc8e9df94b/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= +github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 h1:PgEQkGHR4YimSCEGT5IoswN9gJKZDVskf+he6UClCLw= +github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/grafana/mimir-prometheus v0.0.0-20220425152715-64e6c171c245 h1:/of8oqZo52xJwwF4TDEtEaIebb1MP8hNXpTBQYSQmck= github.com/grafana/mimir-prometheus v0.0.0-20220425152715-64e6c171c245/go.mod h1:W59JUgfj423JtdkiZLvblAJD4IQeE04y26z0CL7DVKc= github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 h1:uirlL/j72L93RhV4+mkWhjv0cov2I0MIgPOG9rMDr1k= diff --git a/vendor/github.com/grafana/dskit/services/basic_service.go b/vendor/github.com/grafana/dskit/services/basic_service.go index ead611a3f97..6ced33aabf9 100644 --- a/vendor/github.com/grafana/dskit/services/basic_service.go +++ b/vendor/github.com/grafana/dskit/services/basic_service.go @@ -15,7 +15,7 @@ import ( type StartingFn func(serviceContext context.Context) error // RunningFn function is called when service enters Running state. When it returns, service will move to Stopping state. -// If RunningFn or Stopping return error, Service will end in Failed state, otherwise if both functions return without +// If RunningFn or StoppingFn return error, Service will end in Failed state, otherwise if both functions return without // error, service will end in Terminated state. type RunningFn func(serviceContext context.Context) error diff --git a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go index 91876a72815..a4affd841f2 100644 --- a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go +++ b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go @@ -101,7 +101,7 @@ func withContext(ctx context.Context, logger log.Logger, resolver TenantResolver // even though the code-base generally uses `userID` to refer to the same thing. userID, err := resolver.TenantID(ctx) if err == nil && userID != "" { - logger = log.With(logger, "org_id", userID) + logger = log.With(logger, "user", userID) } traceID, ok := tracing.ExtractSampledTraceID(ctx) diff --git a/vendor/github.com/hashicorp/memberlist/config.go b/vendor/github.com/hashicorp/memberlist/config.go index 31099e75f44..d7fe4c37b05 100644 --- a/vendor/github.com/hashicorp/memberlist/config.go +++ b/vendor/github.com/hashicorp/memberlist/config.go @@ -21,6 +21,17 @@ type Config struct { // make a NetTransport using BindAddr and BindPort from this structure. Transport Transport + // Label is an optional set of bytes to include on the outside of each + // packet and stream. + // + // If gossip encryption is enabled and this is set it is treated as GCM + // authenticated data. + Label string + + // SkipInboundLabelCheck skips the check that inbound packets and gossip + // streams need to be label prefixed. + SkipInboundLabelCheck bool + // Configuration related to what address to bind to and ports to // listen on. The port is used for both UDP and TCP gossip. It is // assumed other nodes are running on this port, but they do not need diff --git a/vendor/github.com/hashicorp/memberlist/label.go b/vendor/github.com/hashicorp/memberlist/label.go new file mode 100644 index 00000000000..bbe0163ab64 --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/label.go @@ -0,0 +1,178 @@ +package memberlist + +import ( + "bufio" + "fmt" + "io" + "net" +) + +// General approach is to prefix all packets and streams with the same structure: +// +// magic type byte (244): uint8 +// length of label name: uint8 (because labels can't be longer than 255 bytes) +// label name: []uint8 + +// LabelMaxSize is the maximum length of a packet or stream label. +const LabelMaxSize = 255 + +// AddLabelHeaderToPacket prefixes outgoing packets with the correct header if +// the label is not empty. +func AddLabelHeaderToPacket(buf []byte, label string) ([]byte, error) { + if label == "" { + return buf, nil + } + if len(label) > LabelMaxSize { + return nil, fmt.Errorf("label %q is too long", label) + } + + return makeLabelHeader(label, buf), nil +} + +// RemoveLabelHeaderFromPacket removes any label header from the provided +// packet and returns it along with the remaining packet contents. +func RemoveLabelHeaderFromPacket(buf []byte) (newBuf []byte, label string, err error) { + if len(buf) == 0 { + return buf, "", nil // can't possibly be labeled + } + + // [type:byte] [size:byte] [size bytes] + + msgType := messageType(buf[0]) + if msgType != hasLabelMsg { + return buf, "", nil + } + + if len(buf) < 2 { + return nil, "", fmt.Errorf("cannot decode label; packet has been truncated") + } + + size := int(buf[1]) + if size < 1 { + return nil, "", fmt.Errorf("label header cannot be empty when present") + } + + if len(buf) < 2+size { + return nil, "", fmt.Errorf("cannot decode label; packet has been truncated") + } + + label = string(buf[2 : 2+size]) + newBuf = buf[2+size:] + + return newBuf, label, nil +} + +// AddLabelHeaderToStream prefixes outgoing streams with the correct header if +// the label is not empty. +func AddLabelHeaderToStream(conn net.Conn, label string) error { + if label == "" { + return nil + } + if len(label) > LabelMaxSize { + return fmt.Errorf("label %q is too long", label) + } + + header := makeLabelHeader(label, nil) + + _, err := conn.Write(header) + return err +} + +// RemoveLabelHeaderFromStream removes any label header from the beginning of +// the stream if present and returns it along with an updated conn with that +// header removed. +// +// Note that on error it is the caller's responsibility to close the +// connection. +func RemoveLabelHeaderFromStream(conn net.Conn) (net.Conn, string, error) { + br := bufio.NewReader(conn) + + // First check for the type byte. + peeked, err := br.Peek(1) + if err != nil { + if err == io.EOF { + // It is safe to return the original net.Conn at this point because + // it never contained any data in the first place so we don't have + // to splice the buffer into the conn because both are empty. + return conn, "", nil + } + return nil, "", err + } + + msgType := messageType(peeked[0]) + if msgType != hasLabelMsg { + conn, err = newPeekedConnFromBufferedReader(conn, br, 0) + return conn, "", err + } + + // We are guaranteed to get a size byte as well. + peeked, err = br.Peek(2) + if err != nil { + if err == io.EOF { + return nil, "", fmt.Errorf("cannot decode label; stream has been truncated") + } + return nil, "", err + } + + size := int(peeked[1]) + if size < 1 { + return nil, "", fmt.Errorf("label header cannot be empty when present") + } + // NOTE: we don't have to check this against LabelMaxSize because a byte + // already has a max value of 255. + + // Once we know the size we can peek the label as well. Note that since we + // are using the default bufio.Reader size of 4096, the entire label header + // fits in the initial buffer fill so this should be free. + peeked, err = br.Peek(2 + size) + if err != nil { + if err == io.EOF { + return nil, "", fmt.Errorf("cannot decode label; stream has been truncated") + } + return nil, "", err + } + + label := string(peeked[2 : 2+size]) + + conn, err = newPeekedConnFromBufferedReader(conn, br, 2+size) + if err != nil { + return nil, "", err + } + + return conn, label, nil +} + +// newPeekedConnFromBufferedReader will splice the buffer contents after the +// offset into the provided net.Conn and return the result so that the rest of +// the buffer contents are returned first when reading from the returned +// peekedConn before moving on to the unbuffered conn contents. +func newPeekedConnFromBufferedReader(conn net.Conn, br *bufio.Reader, offset int) (*peekedConn, error) { + // Extract any of the readahead buffer. + peeked, err := br.Peek(br.Buffered()) + if err != nil { + return nil, err + } + + return &peekedConn{ + Peeked: peeked[offset:], + Conn: conn, + }, nil +} + +func makeLabelHeader(label string, rest []byte) []byte { + newBuf := make([]byte, 2, 2+len(label)+len(rest)) + newBuf[0] = byte(hasLabelMsg) + newBuf[1] = byte(len(label)) + newBuf = append(newBuf, []byte(label)...) + if len(rest) > 0 { + newBuf = append(newBuf, []byte(rest)...) + } + return newBuf +} + +func labelOverhead(label string) int { + if label == "" { + return 0 + } + return 2 + len(label) +} diff --git a/vendor/github.com/hashicorp/memberlist/memberlist.go b/vendor/github.com/hashicorp/memberlist/memberlist.go index 7ee04009191..cab6db69fd4 100644 --- a/vendor/github.com/hashicorp/memberlist/memberlist.go +++ b/vendor/github.com/hashicorp/memberlist/memberlist.go @@ -187,6 +187,17 @@ func newMemberlist(conf *Config) (*Memberlist, error) { nodeAwareTransport = &shimNodeAwareTransport{transport} } + if len(conf.Label) > LabelMaxSize { + return nil, fmt.Errorf("could not use %q as a label: too long", conf.Label) + } + + if conf.Label != "" { + nodeAwareTransport = &labelWrappedTransport{ + label: conf.Label, + NodeAwareTransport: nodeAwareTransport, + } + } + m := &Memberlist{ config: conf, shutdownCh: make(chan struct{}), @@ -262,7 +273,7 @@ func (m *Memberlist) Join(existing []string) (int, error) { hp := joinHostPort(addr.ip.String(), addr.port) a := Address{Addr: hp, Name: addr.nodeName} if err := m.pushPullNode(a, true); err != nil { - err = fmt.Errorf("Failed to join %s: %v", addr.ip, err) + err = fmt.Errorf("Failed to join %s: %v", a.Addr, err) errs = multierror.Append(errs, err) m.logger.Printf("[DEBUG] memberlist: %v", err) continue diff --git a/vendor/github.com/hashicorp/memberlist/net.go b/vendor/github.com/hashicorp/memberlist/net.go index fe4acbc2e1e..66c1dcd94d2 100644 --- a/vendor/github.com/hashicorp/memberlist/net.go +++ b/vendor/github.com/hashicorp/memberlist/net.go @@ -7,6 +7,7 @@ import ( "fmt" "hash/crc32" "io" + "math" "net" "sync/atomic" "time" @@ -42,6 +43,9 @@ const ( type messageType uint8 // The list of available message types. +// +// WARNING: ONLY APPEND TO THIS LIST! The numeric values are part of the +// protocol itself. const ( pingMsg messageType = iota indirectPingMsg @@ -59,6 +63,13 @@ const ( errMsg ) +const ( + // hasLabelMsg has a deliberately high value so that you can disambiguate + // it from the encryptionVersion header which is either 0/1 right now and + // also any of the existing messageTypes + hasLabelMsg messageType = 244 +) + // compressionType is used to specify the compression algorithm type compressionType uint8 @@ -226,7 +237,32 @@ func (m *Memberlist) handleConn(conn net.Conn) { metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1) conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) - msgType, bufConn, dec, err := m.readStream(conn) + + var ( + streamLabel string + err error + ) + conn, streamLabel, err = RemoveLabelHeaderFromStream(conn) + if err != nil { + m.logger.Printf("[ERR] memberlist: failed to receive and remove the stream label header: %s %s", err, LogConn(conn)) + return + } + + if m.config.SkipInboundLabelCheck { + if streamLabel != "" { + m.logger.Printf("[ERR] memberlist: unexpected double stream label header: %s", LogConn(conn)) + return + } + // Set this from config so that the auth data assertions work below. + streamLabel = m.config.Label + } + + if m.config.Label != streamLabel { + m.logger.Printf("[ERR] memberlist: discarding stream with unacceptable label %q: %s", streamLabel, LogConn(conn)) + return + } + + msgType, bufConn, dec, err := m.readStream(conn, streamLabel) if err != nil { if err != io.EOF { m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn)) @@ -238,7 +274,7 @@ func (m *Memberlist) handleConn(conn net.Conn) { return } - err = m.rawSendMsgStream(conn, out.Bytes()) + err = m.rawSendMsgStream(conn, out.Bytes(), streamLabel) if err != nil { m.logger.Printf("[ERR] memberlist: Failed to send error: %s %s", err, LogConn(conn)) return @@ -269,7 +305,7 @@ func (m *Memberlist) handleConn(conn net.Conn) { return } - if err := m.sendLocalState(conn, join); err != nil { + if err := m.sendLocalState(conn, join, streamLabel); err != nil { m.logger.Printf("[ERR] memberlist: Failed to push local state: %s %s", err, LogConn(conn)) return } @@ -297,7 +333,7 @@ func (m *Memberlist) handleConn(conn net.Conn) { return } - err = m.rawSendMsgStream(conn, out.Bytes()) + err = m.rawSendMsgStream(conn, out.Bytes(), streamLabel) if err != nil { m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogConn(conn)) return @@ -322,10 +358,35 @@ func (m *Memberlist) packetListen() { } func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time) { + var ( + packetLabel string + err error + ) + buf, packetLabel, err = RemoveLabelHeaderFromPacket(buf) + if err != nil { + m.logger.Printf("[ERR] memberlist: %v %s", err, LogAddress(from)) + return + } + + if m.config.SkipInboundLabelCheck { + if packetLabel != "" { + m.logger.Printf("[ERR] memberlist: unexpected double packet label header: %s", LogAddress(from)) + return + } + // Set this from config so that the auth data assertions work below. + packetLabel = m.config.Label + } + + if m.config.Label != packetLabel { + m.logger.Printf("[ERR] memberlist: discarding packet with unacceptable label %q: %s", packetLabel, LogAddress(from)) + return + } + // Check if encryption is enabled if m.config.EncryptionEnabled() { // Decrypt the payload - plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, nil) + authData := []byte(packetLabel) + plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, authData) if err != nil { if !m.config.GossipVerifyIncoming { // Treat the message as plaintext @@ -723,7 +784,7 @@ func (m *Memberlist) encodeAndSendMsg(a Address, msgType messageType, msg interf // opportunistically create a compoundMsg and piggy back other broadcasts. func (m *Memberlist) sendMsg(a Address, msg []byte) error { // Check if we can piggy back any messages - bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead + bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead - labelOverhead(m.config.Label) if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing { bytesAvail -= encryptOverhead(m.encryptionVersion()) } @@ -801,9 +862,12 @@ func (m *Memberlist) rawSendMsgPacket(a Address, node *Node, msg []byte) error { // Check if we have encryption enabled if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing { // Encrypt the payload - var buf bytes.Buffer - primaryKey := m.config.Keyring.GetPrimaryKey() - err := encryptPayload(m.encryptionVersion(), primaryKey, msg, nil, &buf) + var ( + primaryKey = m.config.Keyring.GetPrimaryKey() + packetLabel = []byte(m.config.Label) + buf bytes.Buffer + ) + err := encryptPayload(m.encryptionVersion(), primaryKey, msg, packetLabel, &buf) if err != nil { m.logger.Printf("[ERR] memberlist: Encryption of message failed: %v", err) return err @@ -818,7 +882,7 @@ func (m *Memberlist) rawSendMsgPacket(a Address, node *Node, msg []byte) error { // rawSendMsgStream is used to stream a message to another host without // modification, other than applying compression and encryption if enabled. -func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error { +func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte, streamLabel string) error { // Check if compression is enabled if m.config.EnableCompression { compBuf, err := compressPayload(sendBuf) @@ -831,7 +895,7 @@ func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error { // Check if encryption is enabled if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing { - crypt, err := m.encryptLocalState(sendBuf) + crypt, err := m.encryptLocalState(sendBuf, streamLabel) if err != nil { m.logger.Printf("[ERROR] memberlist: Failed to encrypt local state: %v", err) return err @@ -877,7 +941,8 @@ func (m *Memberlist) sendUserMsg(a Address, sendBuf []byte) error { if _, err := bufConn.Write(sendBuf); err != nil { return err } - return m.rawSendMsgStream(conn, bufConn.Bytes()) + + return m.rawSendMsgStream(conn, bufConn.Bytes(), m.config.Label) } // sendAndReceiveState is used to initiate a push/pull over a stream with a @@ -897,12 +962,12 @@ func (m *Memberlist) sendAndReceiveState(a Address, join bool) ([]pushNodeState, metrics.IncrCounter([]string{"memberlist", "tcp", "connect"}, 1) // Send our state - if err := m.sendLocalState(conn, join); err != nil { + if err := m.sendLocalState(conn, join, m.config.Label); err != nil { return nil, nil, err } conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) - msgType, bufConn, dec, err := m.readStream(conn) + msgType, bufConn, dec, err := m.readStream(conn, m.config.Label) if err != nil { return nil, nil, err } @@ -927,7 +992,7 @@ func (m *Memberlist) sendAndReceiveState(a Address, join bool) ([]pushNodeState, } // sendLocalState is invoked to send our local state over a stream connection. -func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error { +func (m *Memberlist) sendLocalState(conn net.Conn, join bool, streamLabel string) error { // Setup a deadline conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) @@ -984,11 +1049,11 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error { } // Get the send buffer - return m.rawSendMsgStream(conn, bufConn.Bytes()) + return m.rawSendMsgStream(conn, bufConn.Bytes(), streamLabel) } // encryptLocalState is used to help encrypt local state before sending -func (m *Memberlist) encryptLocalState(sendBuf []byte) ([]byte, error) { +func (m *Memberlist) encryptLocalState(sendBuf []byte, streamLabel string) ([]byte, error) { var buf bytes.Buffer // Write the encryptMsg byte @@ -1001,9 +1066,15 @@ func (m *Memberlist) encryptLocalState(sendBuf []byte) ([]byte, error) { binary.BigEndian.PutUint32(sizeBuf, uint32(encLen)) buf.Write(sizeBuf) + // Authenticated Data is: + // + // [messageType; byte] [messageLength; uint32] [stream_label; optional] + // + dataBytes := appendBytes(buf.Bytes()[:5], []byte(streamLabel)) + // Write the encrypted cipher text to the buffer key := m.config.Keyring.GetPrimaryKey() - err := encryptPayload(encVsn, key, sendBuf, buf.Bytes()[:5], &buf) + err := encryptPayload(encVsn, key, sendBuf, dataBytes, &buf) if err != nil { return nil, err } @@ -1011,7 +1082,7 @@ func (m *Memberlist) encryptLocalState(sendBuf []byte) ([]byte, error) { } // decryptRemoteState is used to help decrypt the remote state -func (m *Memberlist) decryptRemoteState(bufConn io.Reader) ([]byte, error) { +func (m *Memberlist) decryptRemoteState(bufConn io.Reader, streamLabel string) ([]byte, error) { // Read in enough to determine message length cipherText := bytes.NewBuffer(nil) cipherText.WriteByte(byte(encryptMsg)) @@ -1025,6 +1096,12 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader) ([]byte, error) { moreBytes := binary.BigEndian.Uint32(cipherText.Bytes()[1:5]) if moreBytes > maxPushStateBytes { return nil, fmt.Errorf("Remote node state is larger than limit (%d)", moreBytes) + + } + + //Start reporting the size before you cross the limit + if moreBytes > uint32(math.Floor(.6*maxPushStateBytes)) { + m.logger.Printf("[WARN] memberlist: Remote node state size is (%d) limit is (%d)", moreBytes, maxPushStateBytes) } // Read in the rest of the payload @@ -1033,8 +1110,13 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader) ([]byte, error) { return nil, err } - // Decrypt the cipherText - dataBytes := cipherText.Bytes()[:5] + // Decrypt the cipherText with some authenticated data + // + // Authenticated Data is: + // + // [messageType; byte] [messageLength; uint32] [label_data; optional] + // + dataBytes := appendBytes(cipherText.Bytes()[:5], []byte(streamLabel)) cipherBytes := cipherText.Bytes()[5:] // Decrypt the payload @@ -1042,15 +1124,18 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader) ([]byte, error) { return decryptPayload(keys, cipherBytes, dataBytes) } -// readStream is used to read from a stream connection, decrypting and +// readStream is used to read messages from a stream connection, decrypting and // decompressing the stream if necessary. -func (m *Memberlist) readStream(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) { +// +// The provided streamLabel if present will be authenticated during decryption +// of each message. +func (m *Memberlist) readStream(conn net.Conn, streamLabel string) (messageType, io.Reader, *codec.Decoder, error) { // Created a buffered reader var bufConn io.Reader = bufio.NewReader(conn) // Read the message type buf := [1]byte{0} - if _, err := bufConn.Read(buf[:]); err != nil { + if _, err := io.ReadFull(bufConn, buf[:]); err != nil { return 0, nil, nil, err } msgType := messageType(buf[0]) @@ -1062,7 +1147,7 @@ func (m *Memberlist) readStream(conn net.Conn) (messageType, io.Reader, *codec.D fmt.Errorf("Remote state is encrypted and encryption is not configured") } - plain, err := m.decryptRemoteState(bufConn) + plain, err := m.decryptRemoteState(bufConn, streamLabel) if err != nil { return 0, nil, nil, err } @@ -1242,11 +1327,11 @@ func (m *Memberlist) sendPingAndWaitForAck(a Address, ping ping, deadline time.T return false, err } - if err = m.rawSendMsgStream(conn, out.Bytes()); err != nil { + if err = m.rawSendMsgStream(conn, out.Bytes(), m.config.Label); err != nil { return false, err } - msgType, _, dec, err := m.readStream(conn) + msgType, _, dec, err := m.readStream(conn, m.config.Label) if err != nil { return false, err } diff --git a/vendor/github.com/hashicorp/memberlist/peeked_conn.go b/vendor/github.com/hashicorp/memberlist/peeked_conn.go new file mode 100644 index 00000000000..3181d90cec0 --- /dev/null +++ b/vendor/github.com/hashicorp/memberlist/peeked_conn.go @@ -0,0 +1,48 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Originally from: https://github.com/google/tcpproxy/blob/master/tcpproxy.go +// at f5c09fbedceb69e4b238dec52cdf9f2fe9a815e2 + +package memberlist + +import "net" + +// peekedConn is an incoming connection that has had some bytes read from it +// to determine how to route the connection. The Read method stitches +// the peeked bytes and unread bytes back together. +type peekedConn struct { + // Peeked are the bytes that have been read from Conn for the + // purposes of route matching, but have not yet been consumed + // by Read calls. It set to nil by Read when fully consumed. + Peeked []byte + + // Conn is the underlying connection. + // It can be type asserted against *net.TCPConn or other types + // as needed. It should not be read from directly unless + // Peeked is nil. + net.Conn +} + +func (c *peekedConn) Read(p []byte) (n int, err error) { + if len(c.Peeked) > 0 { + n = copy(p, c.Peeked) + c.Peeked = c.Peeked[n:] + if len(c.Peeked) == 0 { + c.Peeked = nil + } + return n, nil + } + return c.Conn.Read(p) +} diff --git a/vendor/github.com/hashicorp/memberlist/security.go b/vendor/github.com/hashicorp/memberlist/security.go index 4cb4da36f05..6831be3bc62 100644 --- a/vendor/github.com/hashicorp/memberlist/security.go +++ b/vendor/github.com/hashicorp/memberlist/security.go @@ -199,3 +199,22 @@ func decryptPayload(keys [][]byte, msg []byte, data []byte) ([]byte, error) { return nil, fmt.Errorf("No installed keys could decrypt the message") } + +func appendBytes(first []byte, second []byte) []byte { + hasFirst := len(first) > 0 + hasSecond := len(second) > 0 + + switch { + case hasFirst && hasSecond: + out := make([]byte, 0, len(first)+len(second)) + out = append(out, first...) + out = append(out, second...) + return out + case hasFirst: + return first + case hasSecond: + return second + default: + return nil + } +} diff --git a/vendor/github.com/hashicorp/memberlist/state.go b/vendor/github.com/hashicorp/memberlist/state.go index 95e6ddc48ea..7a2339e9b02 100644 --- a/vendor/github.com/hashicorp/memberlist/state.go +++ b/vendor/github.com/hashicorp/memberlist/state.go @@ -274,6 +274,11 @@ func failedRemote(err error) bool { case "dial", "read", "write": return true } + } else if strings.HasPrefix(t.Net, "udp") { + switch t.Op { + case "write": + return true + } } } return false @@ -324,7 +329,7 @@ func (m *Memberlist) probeNode(node *nodeState) { }() if node.State == StateAlive { if err := m.encodeAndSendMsg(node.FullAddress(), pingMsg, &ping); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err) + m.logger.Printf("[ERR] memberlist: Failed to send UDP ping: %s", err) if failedRemote(err) { goto HANDLE_REMOTE_FAILURE } else { @@ -334,7 +339,7 @@ func (m *Memberlist) probeNode(node *nodeState) { } else { var msgs [][]byte if buf, err := encode(pingMsg, &ping); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err) + m.logger.Printf("[ERR] memberlist: Failed to encode UDP ping message: %s", err) return } else { msgs = append(msgs, buf.Bytes()) @@ -349,7 +354,7 @@ func (m *Memberlist) probeNode(node *nodeState) { compound := makeCompoundMessage(msgs) if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err) + m.logger.Printf("[ERR] memberlist: Failed to send UDP compound ping and suspect message to %s: %s", addr, err) if failedRemote(err) { goto HANDLE_REMOTE_FAILURE } else { @@ -388,7 +393,7 @@ func (m *Memberlist) probeNode(node *nodeState) { // probe interval it will give the TCP fallback more time, which // is more active in dealing with lost packets, and it gives more // time to wait for indirect acks/nacks. - m.logger.Printf("[DEBUG] memberlist: Failed ping: %s (timeout reached)", node.Name) + m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %s (timeout reached)", node.Name) } HANDLE_REMOTE_FAILURE: @@ -421,7 +426,7 @@ HANDLE_REMOTE_FAILURE: } if err := m.encodeAndSendMsg(peer.FullAddress(), indirectPingMsg, &ind); err != nil { - m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err) + m.logger.Printf("[ERR] memberlist: Failed to send indirect UDP ping: %s", err) } } @@ -444,7 +449,11 @@ HANDLE_REMOTE_FAILURE: defer close(fallbackCh) didContact, err := m.sendPingAndWaitForAck(node.FullAddress(), ping, deadline) if err != nil { - m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err) + var to string + if ne, ok := err.(net.Error); ok && ne.Timeout() { + to = fmt.Sprintf("timeout %s: ", probeInterval) + } + m.logger.Printf("[ERR] memberlist: Failed fallback TCP ping: %s%s", to, err) } else { fallbackCh <- didContact } @@ -469,7 +478,7 @@ HANDLE_REMOTE_FAILURE: // any additional time here. for didContact := range fallbackCh { if didContact { - m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name) + m.logger.Printf("[WARN] memberlist: Was able to connect to %s over TCP but UDP probes failed, network may be misconfigured", node.Name) return } } @@ -587,7 +596,7 @@ func (m *Memberlist) gossip() { m.nodeLock.RUnlock() // Compute the bytes available - bytesAvail := m.config.UDPBufferSize - compoundHeaderOverhead + bytesAvail := m.config.UDPBufferSize - compoundHeaderOverhead - labelOverhead(m.config.Label) if m.config.EncryptionEnabled() { bytesAvail -= encryptOverhead(m.encryptionVersion()) } diff --git a/vendor/github.com/hashicorp/memberlist/transport.go b/vendor/github.com/hashicorp/memberlist/transport.go index b23b83914b9..f3d05364d73 100644 --- a/vendor/github.com/hashicorp/memberlist/transport.go +++ b/vendor/github.com/hashicorp/memberlist/transport.go @@ -111,3 +111,50 @@ func (t *shimNodeAwareTransport) WriteToAddress(b []byte, addr Address) (time.Ti func (t *shimNodeAwareTransport) DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error) { return t.DialTimeout(addr.Addr, timeout) } + +type labelWrappedTransport struct { + label string + NodeAwareTransport +} + +var _ NodeAwareTransport = (*labelWrappedTransport)(nil) + +func (t *labelWrappedTransport) WriteToAddress(buf []byte, addr Address) (time.Time, error) { + var err error + buf, err = AddLabelHeaderToPacket(buf, t.label) + if err != nil { + return time.Time{}, fmt.Errorf("failed to add label header to packet: %w", err) + } + return t.NodeAwareTransport.WriteToAddress(buf, addr) +} + +func (t *labelWrappedTransport) WriteTo(buf []byte, addr string) (time.Time, error) { + var err error + buf, err = AddLabelHeaderToPacket(buf, t.label) + if err != nil { + return time.Time{}, err + } + return t.NodeAwareTransport.WriteTo(buf, addr) +} + +func (t *labelWrappedTransport) DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error) { + conn, err := t.NodeAwareTransport.DialAddressTimeout(addr, timeout) + if err != nil { + return nil, err + } + if err := AddLabelHeaderToStream(conn, t.label); err != nil { + return nil, fmt.Errorf("failed to add label header to stream: %w", err) + } + return conn, nil +} + +func (t *labelWrappedTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) { + conn, err := t.NodeAwareTransport.DialTimeout(addr, timeout) + if err != nil { + return nil, err + } + if err := AddLabelHeaderToStream(conn, t.label); err != nil { + return nil, fmt.Errorf("failed to add label header to stream: %w", err) + } + return conn, nil +} diff --git a/vendor/github.com/hashicorp/memberlist/util.go b/vendor/github.com/hashicorp/memberlist/util.go index e7be4ad88eb..8f609c1e0f9 100644 --- a/vendor/github.com/hashicorp/memberlist/util.go +++ b/vendor/github.com/hashicorp/memberlist/util.go @@ -96,13 +96,13 @@ func pushPullScale(interval time.Duration, n int) time.Duration { return time.Duration(multiplier) * interval } -// moveDeadNodes moves nodes that are dead and beyond the gossip to the dead interval +// moveDeadNodes moves dead and left nodes that that have not changed during the gossipToTheDeadTime interval // to the end of the slice and returns the index of the first moved node. func moveDeadNodes(nodes []*nodeState, gossipToTheDeadTime time.Duration) int { numDead := 0 n := len(nodes) for i := 0; i < n-numDead; i++ { - if nodes[i].State != StateDead { + if !nodes[i].DeadOrLeft() { continue } diff --git a/vendor/modules.txt b/vendor/modules.txt index 28bb67fb1bc..52176bd5949 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -434,7 +434,7 @@ github.com/gosimple/slug # github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 ## explicit; go 1.13 github.com/grafana-tools/sdk -# github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca +# github.com/grafana/dskit v0.0.0-20220426141700-ec983e9d5345 ## explicit; go 1.17 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency @@ -515,7 +515,7 @@ github.com/hashicorp/go-sockaddr ## explicit; go 1.12 github.com/hashicorp/golang-lru github.com/hashicorp/golang-lru/simplelru -# github.com/hashicorp/memberlist v0.3.1 => github.com/grafana/memberlist v0.2.5-0.20211201083710-c7bc8e9df94b +# github.com/hashicorp/memberlist v0.3.1 => github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 ## explicit; go 1.12 github.com/hashicorp/memberlist # github.com/hashicorp/serf v0.9.6 @@ -1230,5 +1230,5 @@ gopkg.in/yaml.v3 # github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20220425152715-64e6c171c245 # github.com/hashicorp/go-immutable-radix => github.com/hashicorp/go-immutable-radix v1.2.0 # github.com/hashicorp/go-hclog => github.com/hashicorp/go-hclog v0.12.2 -# github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.2.5-0.20211201083710-c7bc8e9df94b +# github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 # github.com/vimeo/galaxycache => github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e From d751bd5c49505461ab1f6e3be0ff136c81b3370d Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Tue, 26 Apr 2022 11:58:45 -0400 Subject: [PATCH 2/2] Update changelog Signed-off-by: Nick Pillitteri --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4adc26a6650..36b94ca37eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ * [CHANGE] Query-frontend: results cache keys are now versioned, this will cause cache to be re-filled when rolling out this version. #1631 * [CHANGE] Store-gateway: enabled attributes in-memory cache by default. New default configuration is `-blocks-storage.bucket-store.chunks-cache.attributes-in-memory-max-items=50000`. #1727 * [CHANGE] Compactor: Removed the metric `cortex_compactor_garbage_collected_blocks_total` since it duplicates `cortex_compactor_blocks_marked_for_deletion_total`. #1728 -* [CHANGE] All: Logs that used the`org_id` label now use `user` label. #1634 +* [CHANGE] All: Logs that used the`org_id` label now use `user` label. #1634 #1758 * [FEATURE] Ruler: Allow setting `evaluation_delay` for each rule group via rules group configuration file. #1474 * [FEATURE] Ruler: Added support for expression remote evaluation. #1536 * The following CLI flags (and their respective YAML config options) have been added: