From 3177366ca84848b509f647949ab22c8976a56d39 Mon Sep 17 00:00:00 2001 From: glendc Date: Mon, 25 Oct 2021 13:48:15 +0200 Subject: [PATCH] fix all golang-ci lint errors --- .golangci.yml | 94 +++++++++++++++++++++++++++++++++++++ bigquery_insert_all.go | 33 +++++++++---- bigquery_insert_all_test.go | 26 ++++++++-- bigquery_retry.go | 2 +- bigquery_retry_test.go | 28 +++++------ errors.go | 14 ++++++ errors_test.go | 6 +++ streamer.go | 16 +++---- streamer_config.go | 6 +-- streamer_config_test.go | 14 +++--- streamer_test.go | 18 +++---- 11 files changed, 202 insertions(+), 55 deletions(-) create mode 100644 .golangci.yml create mode 100644 errors.go create mode 100644 errors_test.go diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..51223c6 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,94 @@ +run: + timeout: 2m + issues-exit-code: 1 + +linters: + disable-all: true + enable: + # Enabled by default + - deadcode + # - errcheck + - gosimple + - govet + - ineffassign + - staticcheck + - structcheck + - typecheck + - unused + - varcheck + + # Disabled by default + - asciicheck + - bodyclose + # - cyclop + - depguard + - dogsled + - dupl + - durationcheck + - errorlint + # - exhaustive + # - exhaustivestruct + - exportloopref + - forbidigo + - forcetypeassert + # - funlen + # - gci + # - gochecknoglobals + # - gochecknoinits + # - gocognit + - goconst + # - gocritic + # - gocyclo + # - godot + # - godox + - goerr113 + - gofmt + # - gofumpt + - goheader + - goimports + # - gomnd + # - gomoddirectives + - gomodguard + - goprintffuncname + # - gosec + - ifshort + # - importas + # - lll + - makezero + - misspell + - nakedret + # - nestif + - nilerr + # - nlreturn + - noctx + - nolintlint + # - paralleltest + - prealloc + - predeclared + # - promlinter + # - revive + - rowserrcheck + - sqlclosecheck + # - stylecheck + - tagliatelle + # - testpackage + # thelper + - tparallel + - unconvert + # - unparam + - wastedassign + - whitespace + - wrapcheck + # - wsl + +linters-settings: + govet: + disable: + - copylocks + gosimple: + go: "1.17" + + +issues: + max-same-issues: 0 + max-issues-per-linter: 0 diff --git a/bigquery_insert_all.go b/bigquery_insert_all.go index 6f539f9..485f3c5 100644 --- a/bigquery_insert_all.go +++ b/bigquery_insert_all.go @@ -16,7 +16,6 @@ package bqwriter import ( "context" - "errors" "fmt" "time" @@ -65,12 +64,18 @@ func (bqc *stdBQInsertAllClient) Put(ctx context.Context, data interface{}) erro inserter := bqc.client.Dataset(bqc.dataSetID).Table(bqc.tableID).Inserter() inserter.SkipInvalidRows = bqc.skipInvalidRows inserter.IgnoreUnknownValues = bqc.ignoreUnknownValues - return inserter.Put(ctx, data) + if err := inserter.Put(ctx, data); err != nil { + return fmt.Errorf("put data into BQ using google-API inertAll: %w", err) + } + return nil } // Close implements bqInsertAllClient::Close func (bqc *stdBQInsertAllClient) Close() error { - return bqc.client.Close() + if err := bqc.client.Close(); err != nil { + return fmt.Errorf("close BQ google-API insertAll client: %w", err) + } + return nil } // newStdBQInsertAllClient creates a new stdBQInsertAllClient, @@ -95,16 +100,18 @@ func newStdBQInsertAllClient(projectID, dataSetID, tableID string, skipInvalidRo }, nil } +// TODO: test in unit test on this invalidParamErr, to ensure that this is the error we receive indeed!! + // newStdBQInsertAllThickClient creates a new bqInsertAllThickClient. func newStdBQInsertAllThickClient(projectID, dataSetID, tableID string, skipInvalidRows, ignoreUnknownValues bool, batchSize int, maxRetryDeadlineOffset time.Duration, logger Logger) (*bqInsertAllThickClient, error) { if projectID == "" { - return nil, errors.New("NewStreamer: projectID is empty: should be defined") + return nil, fmt.Errorf("thick client creation: validate projectID: %w: missing", invalidParamErr) } if dataSetID == "" { - return nil, errors.New("NewStreamer: dataSetID is empty: should be defined") + return nil, fmt.Errorf("thick client creation: validate dataSetID: %w: missing", invalidParamErr) } if tableID == "" { - return nil, errors.New("NewStreamer: tableID is empty: should be defined") + return nil, fmt.Errorf("thick client creation: validate tableID: %w: missing", invalidParamErr) } client, err := newStdBQInsertAllClient(projectID, dataSetID, tableID, skipInvalidRows, ignoreUnknownValues) if err != nil { @@ -115,10 +122,10 @@ func newStdBQInsertAllThickClient(projectID, dataSetID, tableID string, skipInva func newBQInsertAllThickClient(client bqInsertAllClient, batchSize int, maxRetryDeadlineOffset time.Duration, logger Logger) (*bqInsertAllThickClient, error) { if client == nil { - return nil, errors.New("BQ Insert All Client expected") + return nil, fmt.Errorf("thick client creation: validate client: %w: missing", invalidParamErr) } if logger == nil { - return nil, errors.New("BQ InsertAll Client: Logger expected") + return nil, fmt.Errorf("thick client creation: logger client: %w: missing", invalidParamErr) } if batchSize <= 0 { batchSize = DefaultBatchSize @@ -176,7 +183,10 @@ func (bqc *bqInsertAllThickClient) Flush() (err error) { // we do wrap it with a deadline context to ensure we get a correct deadline ctx, cancelFunc := context.WithTimeout(context.Background(), bqc.maxRetryDeadlineOffset) defer cancelFunc() - return bqc.client.Put(ctx, bqc.rows) + if err := bqc.client.Put(ctx, bqc.rows); err != nil { + return fmt.Errorf("thick insertAll BQ client: put batched rows (count=%d): %w", len(bqc.rows), err) + } + return nil } // Close implements bqClient::Close @@ -184,5 +194,8 @@ func (bqc *bqInsertAllThickClient) Close() error { // no need to flush first, // as this is an internal client used by Streamer only, // which does flush prior to closing it :) - return bqc.client.Close() + if err := bqc.client.Close(); err != nil { + return fmt.Errorf("close thick insertAll BQ client: %w", err) + } + return nil } diff --git a/bigquery_insert_all_test.go b/bigquery_insert_all_test.go index f90cfad..c6315b1 100644 --- a/bigquery_insert_all_test.go +++ b/bigquery_insert_all_test.go @@ -17,6 +17,7 @@ package bqwriter import ( "context" "errors" + "fmt" "reflect" "sort" "testing" @@ -41,9 +42,8 @@ func (sbqc *subBQInsertAllClient) Put(ctx context.Context, data interface{}) err if sbqc.sleepPriorToPut > 0 { time.Sleep(sbqc.sleepPriorToPut) } - err := ctx.Err() - if err != nil { - return err + if err := ctx.Err(); err != nil { + return fmt.Errorf("put data into BQ using stub insertAll: %w", err) } if rows, ok := data.([]interface{}); ok { sbqc.rows = append(sbqc.rows, rows...) @@ -190,6 +190,26 @@ func TestNewStdBQInsertAllThickClientInputErrors(t *testing.T) { stdLogger{}, ) assertError(t, err) + assertEqual(t, true, errors.Is(err, invalidParamErr)) + assertNil(t, client) + } +} + +func TestNewBQInsertAllThickClientErrors(t *testing.T) { + testCases := []struct { + Client bqInsertAllClient + Logger Logger + }{ + {nil, nil}, + {new(subBQInsertAllClient), nil}, + {nil, testLogger{}}, + } + for _, testCase := range testCases { + client, err := newBQInsertAllThickClient( + testCase.Client, 0, 0, testCase.Logger, + ) + assertError(t, err) + assertEqual(t, true, errors.Is(err, invalidParamErr)) assertNil(t, client) } } diff --git a/bigquery_retry.go b/bigquery_retry.go index bbf7dc8..3af6d8c 100644 --- a/bigquery_retry.go +++ b/bigquery_retry.go @@ -105,7 +105,7 @@ func (r *bqRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { return 0, false } // correct the Max time, as to stay as close as possible to our max elapsed retry time - elapsedTime := time.Now().Sub(r.startTime) + elapsedTime := time.Since(r.startTime) r.backoff.Max = r.maxRetryDeadlineOffset - elapsedTime // retry with the pause time indicated by the gax BackOff algorithm r.retries += 1 diff --git a/bigquery_retry_test.go b/bigquery_retry_test.go index 1ac1555..7bbd1a0 100644 --- a/bigquery_retry_test.go +++ b/bigquery_retry_test.go @@ -16,7 +16,7 @@ package bqwriter import ( "context" - "errors" + "fmt" "strings" "testing" "time" @@ -34,11 +34,11 @@ func TestBQRetryerRetryOpFlowFailure(t *testing.T) { 1.1, nil, // no error filter ) - expectedFinalErr := errors.New("fourth error") + expectedFinalErr := fmt.Errorf("fourth error: %w", testStaticErr) errors := []error{ - errors.New("first error"), - errors.New("second error"), - errors.New("third error"), + fmt.Errorf("first error: %w", testStaticErr), + fmt.Errorf("second error: %w", testStaticErr), + fmt.Errorf("third error: %w", testStaticErr), expectedFinalErr, } op := func(context.Context) error { @@ -64,9 +64,9 @@ func TestBQRetryerRetryOpFlowSuccess(t *testing.T) { nil, // no error filter ) errors := []error{ - errors.New("first error"), - errors.New("second error"), - errors.New("third error"), + fmt.Errorf("first error: %w", testStaticErr), + fmt.Errorf("second error: %w", testStaticErr), + fmt.Errorf("third error: %w", testStaticErr), } op := func(context.Context) error { if len(errors) > 0 { @@ -106,7 +106,7 @@ func TestBQRetryerNoRetryBecauseOfCanceledContext(t *testing.T) { DefaultRetryDelayMultiplier, nil, // no error filter ) - pause, shouldRetry := retryer.Retry(errors.New("a test error")) + pause, shouldRetry := retryer.Retry(fmt.Errorf("retry: %w", testStaticErr)) assertEqual(t, false, shouldRetry) assertEqual(t, time.Duration(0), pause) } @@ -122,14 +122,14 @@ func TestBQRetryerNoRetryBecauseOfMaxRetries(t *testing.T) { ) // first time will work - pause, shouldRetry := retryer.Retry(errors.New("a test error")) + pause, shouldRetry := retryer.Retry(fmt.Errorf("retry: %w", testStaticErr)) assertEqual(t, true, shouldRetry) if pause == 0 || pause > DefaultInitialRetryDelay { t.Errorf("unexpeted pause duration: %v", pause) } // second time not, as we reached our limit of max retries - pause, shouldRetry = retryer.Retry(errors.New("a test error")) + pause, shouldRetry = retryer.Retry(fmt.Errorf("retry: %w", testStaticErr)) assertEqual(t, false, shouldRetry) assertEqual(t, time.Duration(0), pause) } @@ -147,14 +147,14 @@ func TestBQRetryerNoRetryBecauseOfErrorFilter(t *testing.T) { ) // first time will work, as the filter didn't trigger - pause, shouldRetry := retryer.Retry(errors.New("retry this test error please")) + pause, shouldRetry := retryer.Retry(fmt.Errorf("retry (should continue): %w", testStaticErr)) assertEqual(t, true, shouldRetry) if pause == 0 || pause > DefaultInitialRetryDelay { t.Errorf("unexpeted pause duration: %v", pause) } // second time not, as we triggered the error filter in the wrong way - pause, shouldRetry = retryer.Retry(errors.New("another test error, but please do not stop! :(")) + pause, shouldRetry = retryer.Retry(fmt.Errorf("retry (should stop): %w", testStaticErr)) assertEqual(t, false, shouldRetry) assertEqual(t, time.Duration(0), pause) } @@ -176,7 +176,7 @@ func TestBQGRPCRetryErrorFilterFalse(t *testing.T) { // nil error is not an accepted error assertEqual(t, false, bqGRPCRetryErrorFilter(nil)) // custom error is not an accepted error - assertEqual(t, false, bqGRPCRetryErrorFilter(errors.New("todo"))) + assertEqual(t, false, bqGRPCRetryErrorFilter(fmt.Errorf("todo: %w", testStaticErr))) // correct error, but wrong code err := status.New(codes.Aborted, "test error").Err() assertEqual(t, false, bqGRPCRetryErrorFilter(err)) diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..ce355d0 --- /dev/null +++ b/errors.go @@ -0,0 +1,14 @@ +package bqwriter + +import "errors" + +// internal errors, as to not lock them in as part of the API, +// given these are errors not meant to be caught by users but really indicating a bug +var ( + // invalidParamErr is returned in case we exit a function early with an error, + // due to an invalid parameter passed in by the callee. + invalidParamErr = errors.New("invalid function parameter") + + // notSupportedErr is returned in case a feature is used that is not (yet) supported. + notSupportedErr = errors.New("not supported") +) diff --git a/errors_test.go b/errors_test.go new file mode 100644 index 0000000..364b13e --- /dev/null +++ b/errors_test.go @@ -0,0 +1,6 @@ +package bqwriter + +import "errors" + +// testStaticErr is an error used for testing purposes only +var testStaticErr = errors.New("static test error") diff --git a/streamer.go b/streamer.go index 7187f04..d41926f 100644 --- a/streamer.go +++ b/streamer.go @@ -50,7 +50,7 @@ func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg ctx, func(ctx context.Context, projectID, dataSetID, tableID string, logger Logger, insertAllCfg *InsertAllClientConfig, storageCfg *StorageClientConfig) (bqClient, error) { if storageCfg != nil { - return nil, errors.New("create new streamer: storage client isn't supported yet") + return nil, fmt.Errorf("create new streamer: using storage client: %w", notSupportedErr) } return newStdBQInsertAllThickClient( @@ -70,13 +70,13 @@ type clientBuilderFunc func(ctx context.Context, projectID, dataSetID, tableID s func newStreamerWithClientBuilder(ctx context.Context, clientBuilder clientBuilderFunc, projectID, dataSetID, tableID string, cfg *StreamerConfig) (*Streamer, error) { if projectID == "" { - return nil, errors.New("NewStreamerBuilder: projectID is empty: should be defined") + return nil, fmt.Errorf("streamer client creation: validate projectID: %w: missing", invalidParamErr) } if dataSetID == "" { - return nil, errors.New("NewStreamerBuilder: dataSetID is empty: should be defined") + return nil, fmt.Errorf("streamer client creation: validate dataSetID: %w: missing", invalidParamErr) } if tableID == "" { - return nil, errors.New("NewStreamerBuilder: tableID is empty: should be defined") + return nil, fmt.Errorf("streamer client creation: validate tableID: %w: missing", invalidParamErr) } // sanitize cfg @@ -129,19 +129,19 @@ func newStreamerWithClientBuilder(ctx context.Context, clientBuilder clientBuild // configured to do so. func (s *Streamer) Write(data interface{}) error { if data == nil { - return errors.New("Streamer::Write: data is nil: should be defined") + return fmt.Errorf("streamer client write: validate data: %w: nil data", invalidParamErr) } job := streamerJob{ Data: data, } - if errors.Is(s.workerCtx.Err(), context.Canceled) { - return errors.New("write data into BQ streamer: streamer is already closed") + if err := s.workerCtx.Err(); errors.Is(err, context.Canceled) { + return fmt.Errorf("write data into BQ streamer: streamer worker context: %w", err) } select { case s.workerCh <- job: s.logger.Debug("inserted write job into bq streamer") case <-s.workerCtx.Done(): - return errors.New("write data into BQ streamer: worker is busy: streamer is already closed") + return fmt.Errorf("write data into BQ streamer: worker is busy: streamer worker context: %w", context.Canceled) } return nil } diff --git a/streamer_config.go b/streamer_config.go index d708a83..9ea26b2 100644 --- a/streamer_config.go +++ b/streamer_config.go @@ -196,7 +196,7 @@ func sanitizeStreamerConfig(cfg *StreamerConfig) (sanCfg *StreamerConfig) { sanCfg.StorageClient = sanitizeStorageClientConfig(cfg.StorageClient) // return the sanitized named output config - return + return sanCfg } // sanitizeInsertAllClientConfig is used to fill in some or all properties @@ -239,7 +239,7 @@ func sanitizeInsertAllClientConfig(cfg *InsertAllClientConfig) (sanCfg *InsertAl } // return the sanitized named output config - return + return sanCfg } // sanitizeStorageClientConfig is used to fill in some or all properties @@ -300,5 +300,5 @@ func sanitizeStorageClientConfig(cfg *StorageClientConfig) (sanCfg *StorageClien } // return the sanitized named output non-nil config - return + return sanCfg } diff --git a/streamer_config_test.go b/streamer_config_test.go index 32140b2..f9ec8f0 100644 --- a/streamer_config_test.go +++ b/streamer_config_test.go @@ -23,17 +23,17 @@ func deepCloneStreamerConfig(cfg *StreamerConfig) *StreamerConfig { if cfg == nil { return nil } - copy := new(StreamerConfig) - *copy = *cfg + cfgCopy := new(StreamerConfig) + *cfgCopy = *cfg if cfg.InsertAllClient != nil { - copy.InsertAllClient = new(InsertAllClientConfig) - *(copy.InsertAllClient) = *(cfg.InsertAllClient) + cfgCopy.InsertAllClient = new(InsertAllClientConfig) + *(cfgCopy.InsertAllClient) = *(cfg.InsertAllClient) } if cfg.StorageClient != nil { - copy.StorageClient = new(StorageClientConfig) - *(copy.StorageClient) = *(cfg.StorageClient) + cfgCopy.StorageClient = new(StorageClientConfig) + *(cfgCopy.StorageClient) = *(cfg.StorageClient) } - return copy + return cfgCopy } var ( diff --git a/streamer_test.go b/streamer_test.go index 5a676cb..579cb71 100644 --- a/streamer_test.go +++ b/streamer_test.go @@ -17,6 +17,7 @@ package bqwriter import ( "context" "errors" + "fmt" "reflect" "sort" "testing" @@ -38,18 +39,15 @@ func TestNewStreamerInputErrors(t *testing.T) { {"", "a", "b"}, {"", "a", "b"}, } - for testCaseIndex, testCase := range testCases { + for _, testCase := range testCases { builder, err := NewStreamer( context.Background(), testCase.ProjectID, testCase.DataSetID, testCase.TableID, nil, ) - if err == nil { - t.Errorf("testCase #%d: expected an error to be returned, non was received", testCaseIndex) - } - if builder != nil { - t.Errorf("testCase #%d: expected builder to be nil, received one", testCaseIndex) - } + assertError(t, err) + assertEqual(t, true, errors.Is(err, invalidParamErr)) + assertNil(t, builder) } } @@ -209,12 +207,14 @@ func TestStreamerWriteErrorAlreadyClosed(t *testing.T) { func TestStreamerWriteErrorNilData(t *testing.T) { _, streamer := newTestStreamer(context.Background(), t, testStreamerConfig{}) defer streamer.Close() - assertError(t, streamer.Write(nil)) + err := streamer.Write(nil) + assertError(t, err) + assertEqual(t, true, errors.Is(err, invalidParamErr)) } func TestStreamerCloseError(t *testing.T) { client, streamer := newTestStreamer(context.Background(), t, testStreamerConfig{}) - client.AddNextError(errors.New("some client close error")) + client.AddNextError(fmt.Errorf("some client close error: %w", testStaticErr)) streamer.Close() // this is logged to stderr, so should be okay for user }