diff --git a/internal/bigquery/batch/client.go b/internal/bigquery/batch/client.go index 47ad732..f251692 100644 --- a/internal/bigquery/batch/client.go +++ b/internal/bigquery/batch/client.go @@ -23,6 +23,7 @@ import ( "github.com/OTA-Insight/bqwriter/log" "cloud.google.com/go/bigquery" + "google.golang.org/api/option" ) var ( @@ -48,12 +49,12 @@ type Client struct { } // NewClient creates a new Client. -func NewClient(projectID, dataSetID, tableID string, ignoreUnknownValues bool, sourceFormat bigquery.DataFormat, writeDisposition bigquery.TableWriteDisposition, schema *bigquery.Schema, logger log.Logger) (*Client, error) { +func NewClient(projectID, dataSetID, tableID string, ignoreUnknownValues bool, sourceFormat bigquery.DataFormat, writeDisposition bigquery.TableWriteDisposition, schema *bigquery.Schema, logger log.Logger, opts ...option.ClientOption) (*Client, error) { // NOTE: we are using the background Context, // as to ensure that we can always write to the client, // even when the actual parent context is already done. // This is a requirement given the streamer will batch its rows. - client, err := bigquery.NewClient(context.Background(), projectID) + client, err := bigquery.NewClient(context.Background(), projectID, opts...) if err != nil { return nil, fmt.Errorf("BQ batch client: creation failed: %w", err) } diff --git a/internal/bigquery/insertall/client.go b/internal/bigquery/insertall/client.go index 2e303ee..0acf64b 100644 --- a/internal/bigquery/insertall/client.go +++ b/internal/bigquery/insertall/client.go @@ -22,6 +22,8 @@ import ( "github.com/OTA-Insight/bqwriter/internal" internalBQ "github.com/OTA-Insight/bqwriter/internal/bigquery" "github.com/OTA-Insight/bqwriter/log" + + "google.golang.org/api/option" ) // Client implements the standard/official BQ (cloud) Client, @@ -49,7 +51,7 @@ type bqClient interface { } // NewClient creates a new Client. -func NewClient(projectID, dataSetID, tableID string, skipInvalidRows, ignoreUnknownValues bool, batchSize int, logger log.Logger) (*Client, error) { +func NewClient(projectID, dataSetID, tableID string, skipInvalidRows, ignoreUnknownValues bool, batchSize int, logger log.Logger, opts ...option.ClientOption) (*Client, error) { if projectID == "" { return nil, fmt.Errorf("bq insertAll client creation: validate projectID: %w: missing", internal.ErrInvalidParam) } @@ -59,7 +61,7 @@ func NewClient(projectID, dataSetID, tableID string, skipInvalidRows, ignoreUnkn if tableID == "" { return nil, fmt.Errorf("bq insertAll client creation: validate tableID: %w: missing", internal.ErrInvalidParam) } - client, err := newStdBQClient(projectID, dataSetID, tableID, skipInvalidRows, ignoreUnknownValues) + client, err := newStdBQClient(projectID, dataSetID, tableID, skipInvalidRows, ignoreUnknownValues, opts...) if err != nil { return nil, err } diff --git a/internal/bigquery/insertall/client_std.go b/internal/bigquery/insertall/client_std.go index cffe438..d861917 100644 --- a/internal/bigquery/insertall/client_std.go +++ b/internal/bigquery/insertall/client_std.go @@ -19,6 +19,7 @@ import ( "fmt" "cloud.google.com/go/bigquery" + "google.golang.org/api/option" ) // stdBQClient impements bqClient using the official Golang Gcloud BigQuery API client. @@ -53,12 +54,12 @@ func (bqc *stdBQClient) Close() error { // newStdBQClient creates a new Client, // a production-ready implementation of bqClient. -func newStdBQClient(projectID, dataSetID, tableID string, skipInvalidRows, ignoreUnknownValues bool) (*stdBQClient, error) { +func newStdBQClient(projectID, dataSetID, tableID string, skipInvalidRows, ignoreUnknownValues bool, opts ...option.ClientOption) (*stdBQClient, error) { // NOTE: we are using the background Context, // as to ensure that we can always write to the client, // even when the actual parent context is already done. // This is a requirement given the streamer will batch its rows. - client, err := bigquery.NewClient(context.Background(), projectID) + client, err := bigquery.NewClient(context.Background(), projectID, opts...) if err != nil { return nil, fmt.Errorf("create BQ Insert All Client: %w", err) } diff --git a/internal/bigquery/storage/client.go b/internal/bigquery/storage/client.go index 6e632ad..fbc6d09 100644 --- a/internal/bigquery/storage/client.go +++ b/internal/bigquery/storage/client.go @@ -23,9 +23,11 @@ import ( "github.com/OTA-Insight/bqwriter/internal" "github.com/OTA-Insight/bqwriter/internal/bigquery/storage/encoding" + "github.com/OTA-Insight/bqwriter/log" "cloud.google.com/go/bigquery/storage/managedwriter" + "google.golang.org/api/option" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/descriptorpb" @@ -58,7 +60,7 @@ type Client struct { // NewClient creates a new BQ Storage Client. // See the documentation of Client for more information how to use it. -func NewClient(projectID, dataSetID, tableID string, encoder encoding.Encoder, dp *descriptorpb.DescriptorProto, logger log.Logger) (*Client, error) { +func NewClient(projectID, dataSetID, tableID string, encoder encoding.Encoder, dp *descriptorpb.DescriptorProto, logger log.Logger, opts ...option.ClientOption) (*Client, error) { if projectID == "" { return nil, fmt.Errorf("bq storage client creation: validate projectID: %w: missing", internal.ErrInvalidParam) } @@ -81,7 +83,7 @@ func NewClient(projectID, dataSetID, tableID string, encoder encoding.Encoder, d // This is a requirement given the streamer will batch its rows. ctx := context.Background() - writer, err := managedwriter.NewClient(ctx, projectID) + writer, err := managedwriter.NewClient(ctx, projectID, opts...) if err != nil { return nil, fmt.Errorf("BQ Storage Client creation: create managed writer: %w", err) } diff --git a/streamer.go b/streamer.go index 56694c0..d3f2539 100644 --- a/streamer.go +++ b/streamer.go @@ -31,6 +31,8 @@ import ( "github.com/OTA-Insight/bqwriter/internal/bigquery/storage" "github.com/OTA-Insight/bqwriter/internal/bigquery/storage/encoding" "github.com/OTA-Insight/bqwriter/log" + + "google.golang.org/api/option" ) // Streamer is a simple BQ stream-writer, allowing you @@ -54,7 +56,7 @@ type streamerJob struct { // // An error is returned in case the Streamer Client couldn't be created for some unexpected reason, // most likely something going wrong within the layer of actually interacting with GCloud. -func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg *StreamerConfig) (*Streamer, error) { +func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg *StreamerConfig, opts ...option.ClientOption) (*Streamer, error) { return newStreamerWithClientBuilder( ctx, func(ctx context.Context, projectID, dataSetID, tableID string, logger log.Logger, insertAllCfg *InsertAllClientConfig, storageCfg *StorageClientConfig, batchCfg *BatchClientConfig) (bigquery.Client, error) { @@ -83,7 +85,7 @@ func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg client, err := storage.NewClient( projectID, dataSetID, tableID, encoder, protobufDescriptor, - logger, + logger, opts..., ) if err != nil { return nil, fmt.Errorf("BigQuery: NewStreamer: New Storage client: %w", err) @@ -97,6 +99,7 @@ func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg !batchCfg.FailForUnknownValues, batchCfg.SourceFormat, batchCfg.WriteDisposition, batchCfg.BigQuerySchema, logger, + opts..., ) if err != nil { @@ -110,7 +113,7 @@ func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg !insertAllCfg.FailOnInvalidRows, !insertAllCfg.FailForUnknownValues, insertAllCfg.BatchSize, - logger, + logger, opts..., ) if err != nil { return nil, fmt.Errorf("BigQuery: NewStreamer: New InsertAll client: %w", err)