Skip to content

Commit

Permalink
Revert upgrade to google.golang.org/grpc v1.66.2 (#9811)
Browse files Browse the repository at this point in the history
* Revert "Don't free buffers after reading query stream (#9721)"

This reverts commit f7b6017.

* Revert: Upgrade to google.golang.org/grpc v1.66.2 / modify certain protobuf messages to retain their unmarshaling buffer (#9401)

Signed-off-by: Yuri Nikolic <[email protected]>

* Revert "Distributor.queryIngesterStream: Free gRPC buffers upon error (#9756)"

This reverts commit eda1a4b.

---------

Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic authored Nov 4, 2024
1 parent 5e1e6b2 commit 8e90ec1
Show file tree
Hide file tree
Showing 96 changed files with 2,504 additions and 4,002 deletions.
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
golang.org/x/net v0.30.0
golang.org/x/sync v0.8.0
golang.org/x/time v0.6.0
google.golang.org/grpc v1.66.2
google.golang.org/grpc v1.66.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down Expand Up @@ -316,3 +316,7 @@ replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-aler
// - https://github.com/grafana/franz-go/pull/3
// - https://github.com/grafana/franz-go/pull/4
replace github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937

// Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions.
// Following https://github.com/grafana/dskit/pull/581
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0
1,182 changes: 1,121 additions & 61 deletions go.sum

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,6 @@ func NextOrCleanup(next PushFunc, pushReq *Request) (_ PushFunc, maybeCleanup fu
func (d *Distributor) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
pushReq := NewParsedRequest(req)
pushReq.AddCleanup(func() {
req.FreeBuffer()
mimirpb.ReuseSlice(req.Timeseries)
})

Expand Down
26 changes: 3 additions & 23 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,20 +206,6 @@ type ingesterQueryResult struct {
chunkseriesBatches [][]ingester_client.TimeSeriesChunk
timeseriesBatches [][]mimirpb.TimeSeries
streamingSeries seriesChunksStream

// Retain responses owning referenced gRPC buffers, until they are freed.
responses []*ingester_client.QueryStreamResponse
}

func (r *ingesterQueryResult) addResponse(resp *ingester_client.QueryStreamResponse) {
r.responses = append(r.responses, resp)
}

func (r *ingesterQueryResult) freeBuffers() {
for _, resp := range r.responses {
resp.FreeBuffer()
}
r.responses = nil
}

// queryIngesterStream queries the ingesters using the gRPC streaming API.
Expand All @@ -229,7 +215,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

// queryIngester MUST call cancelContext once processing is completed in order to release resources. It's required
// by ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation() to properly release resources.
queryIngester := func(ctx context.Context, ing *ring.InstanceDesc, cancelContext context.CancelCauseFunc) (result ingesterQueryResult, err error) {
queryIngester := func(ctx context.Context, ing *ring.InstanceDesc, cancelContext context.CancelCauseFunc) (ingesterQueryResult, error) {
log, ctx := spanlogger.NewWithLogger(ctx, d.log, "Distributor.queryIngesterStream")
cleanup := func() {
log.Span.Finish()
Expand All @@ -248,10 +234,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

cleanup()
}

if err != nil {
result.freeBuffers()
}
}()

log.Span.SetTag("ingester_address", ing.Addr)
Expand All @@ -267,15 +249,15 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
return ingesterQueryResult{}, err
}

result := ingesterQueryResult{}

// Why retain the batches rather than iteratively build a single slice?
// If we iteratively build a single slice, we'll spend a lot of time copying elements as the slice grows beyond its capacity.
// So instead, we build the slice in one go once we know how many series we have.
var streamingSeriesBatches [][]labels.Labels
streamingSeriesCount := 0

for {
// XXX: Note that while we free responses' gRPC buffers on error, we don't do the same in case of success,
// as the combined response retains references to gRPC buffers.
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
// We will never get an EOF here from an ingester that is streaming chunks, so we don't need to do anything to set up streaming here.
Expand All @@ -284,8 +266,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
return ingesterQueryResult{}, err
}

result.addResponse(resp)

if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
Expand Down
3 changes: 0 additions & 3 deletions pkg/frontend/querymiddleware/model.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 0 additions & 14 deletions pkg/frontend/querymiddleware/model.pb.go.expdiff

This file was deleted.

23 changes: 3 additions & 20 deletions pkg/ingester/client/buffering_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -70,12 +68,7 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) {
}

reqs := serv.requests()
diff := cmp.Diff(requestsToSend, reqs, cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmpopts.IgnoreUnexported(mimirpb.BufferHolder{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
return a.TimeSeries.Equal(b.TimeSeries)
}))
}))
require.Empty(t, diff)
require.Equal(t, requestsToSend, reqs)
})

t.Run("push with pooling", func(t *testing.T) {
Expand All @@ -92,12 +85,7 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) {
}

reqs := serv.requests()
diff := cmp.Diff(requestsToSend, reqs, cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmpopts.IgnoreUnexported(mimirpb.BufferHolder{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
return a.TimeSeries.Equal(b.TimeSeries)
}))
}))
require.Empty(t, diff)
require.Equal(t, requestsToSend, reqs)

// Verify that pool was used.
require.Greater(t, pool.Gets.Load(), int64(0))
Expand Down Expand Up @@ -161,12 +149,7 @@ func TestWriteRequestBufferingClient_Push_WithMultipleMarshalCalls(t *testing.T)
_, err := bufferingClient.Push(ctx, req)
require.NoError(t, err)

diff := cmp.Diff([]*mimirpb.WriteRequest{req}, serv.requests(), cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmpopts.IgnoreUnexported(mimirpb.BufferHolder{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
return a.TimeSeries.Equal(b.TimeSeries)
}))
}))
require.Empty(t, diff)
require.Equal(t, serv.requests(), []*mimirpb.WriteRequest{req})

// Verify that all buffers from the pool were returned.
require.Greater(t, pool.Gets.Load(), int64(0))
Expand Down
15 changes: 0 additions & 15 deletions pkg/ingester/client/ingester.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 0 additions & 54 deletions pkg/ingester/client/ingester.pb.go.expdiff

This file was deleted.

5 changes: 1 addition & 4 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3858,10 +3858,7 @@ func (i *Ingester) checkAvailableForPush() error {

// PushToStorage implements ingest.Pusher interface for ingestion via ingest-storage.
func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error {
err := i.PushWithCleanup(ctx, req, func() {
req.FreeBuffer()
mimirpb.ReuseSlice(req.Timeseries)
})
err := i.PushWithCleanup(ctx, req, func() { mimirpb.ReuseSlice(req.Timeseries) })
if err != nil {
return mapPushErrorToErrorWithStatus(err)
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3292,10 +3292,8 @@ func TestIngester_Push(t *testing.T) {

// Push timeseries
for idx, req := range testData.reqs {
// Push metrics to the ingester.
err := i.PushWithCleanup(ctx, req, func() {
req.FreeBuffer()
})
// Push metrics to the ingester. Override the default cleanup method of mimirpb.ReuseSlice with a no-op one.
err := i.PushWithCleanup(ctx, req, func() {})

// We expect no error on any request except the last one
// which may error (and in that case we assert on it)
Expand Down Expand Up @@ -5532,7 +5530,7 @@ func TestIngester_QueryStream_StreamingWithManySamples(t *testing.T) {
IsEndOfSeriesStream: true,
}

require.EqualExportedValues(t, seriesLabelsMsg, *resp)
require.Equal(t, seriesLabelsMsg, *resp)

recvMsgs := 0
series := 0
Expand Down
76 changes: 0 additions & 76 deletions pkg/mimirpb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,84 +8,8 @@ import (
"math"

"github.com/prometheus/prometheus/model/histogram"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/mem"
protobufproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/protoadapt"
)

func init() {
c := encoding.GetCodecV2(proto.Name)
encoding.RegisterCodecV2(&codecV2{codec: c})
}

// codecV2 customizes gRPC unmarshalling.
type codecV2 struct {
codec encoding.CodecV2
}

var _ encoding.CodecV2 = &codecV2{}

func messageV2Of(v any) protobufproto.Message {
switch v := v.(type) {
case protoadapt.MessageV1:
return protoadapt.MessageV2Of(v)
case protoadapt.MessageV2:
return v
default:
panic(fmt.Errorf("unrecognized message type %T", v))
}
}

func (c *codecV2) Marshal(v any) (mem.BufferSlice, error) {
return c.codec.Marshal(v)
}

// Unmarshal customizes gRPC unmarshalling.
// If v wraps BufferHolder, its SetBuffer method is called with the unmarshalling buffer.
func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error {
vv := messageV2Of(v)
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
// Decrement buf's reference count. Note though that if v wraps BufferHolder,
// we increase buf's reference count first so it doesn't go to zero.
defer buf.Free()

if err := protobufproto.Unmarshal(buf.ReadOnlyData(), vv); err != nil {
return err
}

if holder, ok := v.(interface {
SetBuffer(mem.Buffer)
}); ok {
buf.Ref()
holder.SetBuffer(buf)
}

return nil
}

func (c *codecV2) Name() string {
return c.codec.Name()
}

// BufferHolder is a base type for protobuf messages that keep unsafe references to the unmarshalling buffer.
// Implementations of this interface should keep a reference to said buffer.
type BufferHolder struct {
buffer mem.Buffer
}

func (m *BufferHolder) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *BufferHolder) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}

// MinTimestamp returns the minimum timestamp (milliseconds) among all series
// in the WriteRequest. Returns math.MaxInt64 if the request is empty.
func (m *WriteRequest) MinTimestamp() int64 {
Expand Down
Loading

0 comments on commit 8e90ec1

Please sign in to comment.