From c65f9dab8118295e49a7b863f59cb64ace4c2d5b Mon Sep 17 00:00:00 2001 From: shollyman Date: Wed, 10 Aug 2022 11:42:51 -0500 Subject: [PATCH] fix(bigquery/storage/managedwriter): propagate calloptions to append (#6488) * fix(bigquery/storage/managedwriter): propagate calloptions to append Reporter identified that we're not properly propagating user-specified call options when we open the AppendRows bidi stream. Fixes: https://github.com/googleapis/google-cloud-go/issues/6487 --- bigquery/storage/managedwriter/client.go | 23 +++++++++++------- bigquery/storage/managedwriter/client_test.go | 4 +++- .../managedwriter/managed_stream_test.go | 24 +++++++++++++++++++ 3 files changed, 41 insertions(+), 10 deletions(-) diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 035ffb7f7f57..bc94e3182df3 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -92,6 +92,19 @@ func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*M return c.buildManagedStream(ctx, c.rawClient.AppendRows, false, opts...) } +// createOpenF builds the opener function we need to access the AppendRows bidi stream. +func createOpenF(ctx context.Context, streamFunc streamClientFunc) func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { + return func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { + arc, err := streamFunc( + // Bidi Streaming doesn't append stream ID as request metadata, so we must inject it manually. + metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID)), opts...) + if err != nil { + return nil, err + } + return arc, nil + } +} + func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, skipSetup bool, opts ...WriterOption) (*ManagedStream, error) { ctx, cancel := context.WithCancel(ctx) @@ -103,15 +116,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient callOptions: []gax.CallOption{ gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)), }, - open: func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { - arc, err := streamFunc( - // Bidi Streaming doesn't append stream ID as request metadata, so we must inject it manually. - metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID))) - if err != nil { - return nil, err - } - return arc, nil - }, + open: createOpenF(ctx, streamFunc), } // apply writer options diff --git a/bigquery/storage/managedwriter/client_test.go b/bigquery/storage/managedwriter/client_test.go index 2183f4c6c794..a197c52fb6d0 100644 --- a/bigquery/storage/managedwriter/client_test.go +++ b/bigquery/storage/managedwriter/client_test.go @@ -14,7 +14,9 @@ package managedwriter -import "testing" +import ( + "testing" +) func TestTableParentFromStreamName(t *testing.T) { testCases := []struct { diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index c36ed0c150e1..894e6cf021a4 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -17,12 +17,15 @@ package managedwriter import ( "context" "errors" + "fmt" "runtime" "testing" "time" "github.com/googleapis/gax-go/v2" + "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -420,3 +423,24 @@ func TestManagedStream_LeakingGoroutines(t *testing.T) { } } } + +// Ensures we're propagating call options as expected. +// Background: https://github.com/googleapis/google-cloud-go/issues/6487 +func TestOpenCallOptionPropagation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + ms := &ManagedStream{ + ctx: ctx, + callOptions: []gax.CallOption{ + gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(99)), + }, + open: createOpenF(ctx, func(ctx context.Context, opts ...gax.CallOption) (storage.BigQueryWrite_AppendRowsClient, error) { + if len(opts) == 0 { + t.Fatalf("no options were propagated") + } + return nil, fmt.Errorf("no real client") + }), + } + ms.openWithRetry() +}