Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support connecting to local BigQuery emulator #11

Merged
merged 3 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/bigquery/batch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/OTA-Insight/bqwriter/log"

"cloud.google.com/go/bigquery"
"google.golang.org/api/option"
)

var (
Expand All @@ -48,12 +49,12 @@ type Client struct {
}

// NewClient creates a new Client.
func NewClient(projectID, dataSetID, tableID string, ignoreUnknownValues bool, sourceFormat bigquery.DataFormat, writeDisposition bigquery.TableWriteDisposition, schema *bigquery.Schema, logger log.Logger) (*Client, error) {
func NewClient(projectID, dataSetID, tableID string, ignoreUnknownValues bool, sourceFormat bigquery.DataFormat, writeDisposition bigquery.TableWriteDisposition, schema *bigquery.Schema, logger log.Logger, opts ...option.ClientOption) (*Client, error) {
// NOTE: we are using the background Context,
// as to ensure that we can always write to the client,
// even when the actual parent context is already done.
// This is a requirement given the streamer will batch its rows.
client, err := bigquery.NewClient(context.Background(), projectID)
client, err := bigquery.NewClient(context.Background(), projectID, opts...)
if err != nil {
return nil, fmt.Errorf("BQ batch client: creation failed: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions internal/bigquery/insertall/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/OTA-Insight/bqwriter/internal"
internalBQ "github.com/OTA-Insight/bqwriter/internal/bigquery"
"github.com/OTA-Insight/bqwriter/log"

"google.golang.org/api/option"
)

// Client implements the standard/official BQ (cloud) Client,
Expand Down Expand Up @@ -49,7 +51,7 @@ type bqClient interface {
}

// NewClient creates a new Client.
func NewClient(projectID, dataSetID, tableID string, skipInvalidRows, ignoreUnknownValues bool, batchSize int, logger log.Logger) (*Client, error) {
func NewClient(projectID, dataSetID, tableID string, skipInvalidRows, ignoreUnknownValues bool, batchSize int, logger log.Logger, opts ...option.ClientOption) (*Client, error) {
if projectID == "" {
return nil, fmt.Errorf("bq insertAll client creation: validate projectID: %w: missing", internal.ErrInvalidParam)
}
Expand All @@ -59,7 +61,7 @@ func NewClient(projectID, dataSetID, tableID string, skipInvalidRows, ignoreUnkn
if tableID == "" {
return nil, fmt.Errorf("bq insertAll client creation: validate tableID: %w: missing", internal.ErrInvalidParam)
}
client, err := newStdBQClient(projectID, dataSetID, tableID, skipInvalidRows, ignoreUnknownValues)
client, err := newStdBQClient(projectID, dataSetID, tableID, skipInvalidRows, ignoreUnknownValues, opts...)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions internal/bigquery/insertall/client_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"cloud.google.com/go/bigquery"
"google.golang.org/api/option"
)

// stdBQClient impements bqClient using the official Golang Gcloud BigQuery API client.
Expand Down Expand Up @@ -53,12 +54,12 @@ func (bqc *stdBQClient) Close() error {

// newStdBQClient creates a new Client,
// a production-ready implementation of bqClient.
func newStdBQClient(projectID, dataSetID, tableID string, skipInvalidRows, ignoreUnknownValues bool) (*stdBQClient, error) {
func newStdBQClient(projectID, dataSetID, tableID string, skipInvalidRows, ignoreUnknownValues bool, opts ...option.ClientOption) (*stdBQClient, error) {
// NOTE: we are using the background Context,
// as to ensure that we can always write to the client,
// even when the actual parent context is already done.
// This is a requirement given the streamer will batch its rows.
client, err := bigquery.NewClient(context.Background(), projectID)
client, err := bigquery.NewClient(context.Background(), projectID, opts...)
if err != nil {
return nil, fmt.Errorf("create BQ Insert All Client: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions internal/bigquery/storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (

"github.com/OTA-Insight/bqwriter/internal"
"github.com/OTA-Insight/bqwriter/internal/bigquery/storage/encoding"

"github.com/OTA-Insight/bqwriter/log"

"cloud.google.com/go/bigquery/storage/managedwriter"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/descriptorpb"
Expand Down Expand Up @@ -58,7 +60,7 @@ type Client struct {

// NewClient creates a new BQ Storage Client.
// See the documentation of Client for more information how to use it.
func NewClient(projectID, dataSetID, tableID string, encoder encoding.Encoder, dp *descriptorpb.DescriptorProto, logger log.Logger) (*Client, error) {
func NewClient(projectID, dataSetID, tableID string, encoder encoding.Encoder, dp *descriptorpb.DescriptorProto, logger log.Logger, opts ...option.ClientOption) (*Client, error) {
if projectID == "" {
return nil, fmt.Errorf("bq storage client creation: validate projectID: %w: missing", internal.ErrInvalidParam)
}
Expand All @@ -81,7 +83,7 @@ func NewClient(projectID, dataSetID, tableID string, encoder encoding.Encoder, d
// This is a requirement given the streamer will batch its rows.
ctx := context.Background()

writer, err := managedwriter.NewClient(ctx, projectID)
writer, err := managedwriter.NewClient(ctx, projectID, opts...)
if err != nil {
return nil, fmt.Errorf("BQ Storage Client creation: create managed writer: %w", err)
}
Expand Down
9 changes: 6 additions & 3 deletions streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/OTA-Insight/bqwriter/internal/bigquery/storage"
"github.com/OTA-Insight/bqwriter/internal/bigquery/storage/encoding"
"github.com/OTA-Insight/bqwriter/log"

"google.golang.org/api/option"
)

// Streamer is a simple BQ stream-writer, allowing you
Expand All @@ -54,7 +56,7 @@ type streamerJob struct {
//
// An error is returned in case the Streamer Client couldn't be created for some unexpected reason,
// most likely something going wrong within the layer of actually interacting with GCloud.
func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg *StreamerConfig) (*Streamer, error) {
func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg *StreamerConfig, opts ...option.ClientOption) (*Streamer, error) {
return newStreamerWithClientBuilder(
ctx,
func(ctx context.Context, projectID, dataSetID, tableID string, logger log.Logger, insertAllCfg *InsertAllClientConfig, storageCfg *StorageClientConfig, batchCfg *BatchClientConfig) (bigquery.Client, error) {
Expand Down Expand Up @@ -83,7 +85,7 @@ func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg
client, err := storage.NewClient(
projectID, dataSetID, tableID,
encoder, protobufDescriptor,
logger,
logger, opts...,
)
if err != nil {
return nil, fmt.Errorf("BigQuery: NewStreamer: New Storage client: %w", err)
Expand All @@ -97,6 +99,7 @@ func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg
!batchCfg.FailForUnknownValues,
batchCfg.SourceFormat, batchCfg.WriteDisposition,
batchCfg.BigQuerySchema, logger,
opts...,
)

if err != nil {
Expand All @@ -110,7 +113,7 @@ func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg
!insertAllCfg.FailOnInvalidRows,
!insertAllCfg.FailForUnknownValues,
insertAllCfg.BatchSize,
logger,
logger, opts...,
)
if err != nil {
return nil, fmt.Errorf("BigQuery: NewStreamer: New InsertAll client: %w", err)
Expand Down