Skip to content

Commit

Permalink
fix all golang-ci lint errors
Browse files Browse the repository at this point in the history
  • Loading branch information
glendc committed Oct 25, 2021
1 parent 08db112 commit 3177366
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 55 deletions.
94 changes: 94 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
run:
timeout: 2m
issues-exit-code: 1

linters:
disable-all: true
enable:
# Enabled by default
- deadcode
# - errcheck
- gosimple
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck

# Disabled by default
- asciicheck
- bodyclose
# - cyclop
- depguard
- dogsled
- dupl
- durationcheck
- errorlint
# - exhaustive
# - exhaustivestruct
- exportloopref
- forbidigo
- forcetypeassert
# - funlen
# - gci
# - gochecknoglobals
# - gochecknoinits
# - gocognit
- goconst
# - gocritic
# - gocyclo
# - godot
# - godox
- goerr113
- gofmt
# - gofumpt
- goheader
- goimports
# - gomnd
# - gomoddirectives
- gomodguard
- goprintffuncname
# - gosec
- ifshort
# - importas
# - lll
- makezero
- misspell
- nakedret
# - nestif
- nilerr
# - nlreturn
- noctx
- nolintlint
# - paralleltest
- prealloc
- predeclared
# - promlinter
# - revive
- rowserrcheck
- sqlclosecheck
# - stylecheck
- tagliatelle
# - testpackage
# thelper
- tparallel
- unconvert
# - unparam
- wastedassign
- whitespace
- wrapcheck
# - wsl

linters-settings:
govet:
disable:
- copylocks
gosimple:
go: "1.17"


issues:
max-same-issues: 0
max-issues-per-linter: 0
33 changes: 23 additions & 10 deletions bigquery_insert_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package bqwriter

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -65,12 +64,18 @@ func (bqc *stdBQInsertAllClient) Put(ctx context.Context, data interface{}) erro
inserter := bqc.client.Dataset(bqc.dataSetID).Table(bqc.tableID).Inserter()
inserter.SkipInvalidRows = bqc.skipInvalidRows
inserter.IgnoreUnknownValues = bqc.ignoreUnknownValues
return inserter.Put(ctx, data)
if err := inserter.Put(ctx, data); err != nil {
return fmt.Errorf("put data into BQ using google-API inertAll: %w", err)
}
return nil
}

// Close implements bqInsertAllClient::Close
func (bqc *stdBQInsertAllClient) Close() error {
return bqc.client.Close()
if err := bqc.client.Close(); err != nil {
return fmt.Errorf("close BQ google-API insertAll client: %w", err)
}
return nil
}

// newStdBQInsertAllClient creates a new stdBQInsertAllClient,
Expand All @@ -95,16 +100,18 @@ func newStdBQInsertAllClient(projectID, dataSetID, tableID string, skipInvalidRo
}, nil
}

// TODO: test in unit test on this invalidParamErr, to ensure that this is the error we receive indeed!!

// newStdBQInsertAllThickClient creates a new bqInsertAllThickClient.
func newStdBQInsertAllThickClient(projectID, dataSetID, tableID string, skipInvalidRows, ignoreUnknownValues bool, batchSize int, maxRetryDeadlineOffset time.Duration, logger Logger) (*bqInsertAllThickClient, error) {
if projectID == "" {
return nil, errors.New("NewStreamer: projectID is empty: should be defined")
return nil, fmt.Errorf("thick client creation: validate projectID: %w: missing", invalidParamErr)
}
if dataSetID == "" {
return nil, errors.New("NewStreamer: dataSetID is empty: should be defined")
return nil, fmt.Errorf("thick client creation: validate dataSetID: %w: missing", invalidParamErr)
}
if tableID == "" {
return nil, errors.New("NewStreamer: tableID is empty: should be defined")
return nil, fmt.Errorf("thick client creation: validate tableID: %w: missing", invalidParamErr)
}
client, err := newStdBQInsertAllClient(projectID, dataSetID, tableID, skipInvalidRows, ignoreUnknownValues)
if err != nil {
Expand All @@ -115,10 +122,10 @@ func newStdBQInsertAllThickClient(projectID, dataSetID, tableID string, skipInva

func newBQInsertAllThickClient(client bqInsertAllClient, batchSize int, maxRetryDeadlineOffset time.Duration, logger Logger) (*bqInsertAllThickClient, error) {
if client == nil {
return nil, errors.New("BQ Insert All Client expected")
return nil, fmt.Errorf("thick client creation: validate client: %w: missing", invalidParamErr)
}
if logger == nil {
return nil, errors.New("BQ InsertAll Client: Logger expected")
return nil, fmt.Errorf("thick client creation: logger client: %w: missing", invalidParamErr)
}
if batchSize <= 0 {
batchSize = DefaultBatchSize
Expand Down Expand Up @@ -176,13 +183,19 @@ func (bqc *bqInsertAllThickClient) Flush() (err error) {
// we do wrap it with a deadline context to ensure we get a correct deadline
ctx, cancelFunc := context.WithTimeout(context.Background(), bqc.maxRetryDeadlineOffset)
defer cancelFunc()
return bqc.client.Put(ctx, bqc.rows)
if err := bqc.client.Put(ctx, bqc.rows); err != nil {
return fmt.Errorf("thick insertAll BQ client: put batched rows (count=%d): %w", len(bqc.rows), err)
}
return nil
}

// Close implements bqClient::Close
func (bqc *bqInsertAllThickClient) Close() error {
// no need to flush first,
// as this is an internal client used by Streamer only,
// which does flush prior to closing it :)
return bqc.client.Close()
if err := bqc.client.Close(); err != nil {
return fmt.Errorf("close thick insertAll BQ client: %w", err)
}
return nil
}
26 changes: 23 additions & 3 deletions bigquery_insert_all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package bqwriter
import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"testing"
Expand All @@ -41,9 +42,8 @@ func (sbqc *subBQInsertAllClient) Put(ctx context.Context, data interface{}) err
if sbqc.sleepPriorToPut > 0 {
time.Sleep(sbqc.sleepPriorToPut)
}
err := ctx.Err()
if err != nil {
return err
if err := ctx.Err(); err != nil {
return fmt.Errorf("put data into BQ using stub insertAll: %w", err)
}
if rows, ok := data.([]interface{}); ok {
sbqc.rows = append(sbqc.rows, rows...)
Expand Down Expand Up @@ -190,6 +190,26 @@ func TestNewStdBQInsertAllThickClientInputErrors(t *testing.T) {
stdLogger{},
)
assertError(t, err)
assertEqual(t, true, errors.Is(err, invalidParamErr))
assertNil(t, client)
}
}

func TestNewBQInsertAllThickClientErrors(t *testing.T) {
testCases := []struct {
Client bqInsertAllClient
Logger Logger
}{
{nil, nil},
{new(subBQInsertAllClient), nil},
{nil, testLogger{}},
}
for _, testCase := range testCases {
client, err := newBQInsertAllThickClient(
testCase.Client, 0, 0, testCase.Logger,
)
assertError(t, err)
assertEqual(t, true, errors.Is(err, invalidParamErr))
assertNil(t, client)
}
}
2 changes: 1 addition & 1 deletion bigquery_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (r *bqRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
return 0, false
}
// correct the Max time, as to stay as close as possible to our max elapsed retry time
elapsedTime := time.Now().Sub(r.startTime)
elapsedTime := time.Since(r.startTime)
r.backoff.Max = r.maxRetryDeadlineOffset - elapsedTime
// retry with the pause time indicated by the gax BackOff algorithm
r.retries += 1
Expand Down
28 changes: 14 additions & 14 deletions bigquery_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package bqwriter

import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
Expand All @@ -34,11 +34,11 @@ func TestBQRetryerRetryOpFlowFailure(t *testing.T) {
1.1,
nil, // no error filter
)
expectedFinalErr := errors.New("fourth error")
expectedFinalErr := fmt.Errorf("fourth error: %w", testStaticErr)
errors := []error{
errors.New("first error"),
errors.New("second error"),
errors.New("third error"),
fmt.Errorf("first error: %w", testStaticErr),
fmt.Errorf("second error: %w", testStaticErr),
fmt.Errorf("third error: %w", testStaticErr),
expectedFinalErr,
}
op := func(context.Context) error {
Expand All @@ -64,9 +64,9 @@ func TestBQRetryerRetryOpFlowSuccess(t *testing.T) {
nil, // no error filter
)
errors := []error{
errors.New("first error"),
errors.New("second error"),
errors.New("third error"),
fmt.Errorf("first error: %w", testStaticErr),
fmt.Errorf("second error: %w", testStaticErr),
fmt.Errorf("third error: %w", testStaticErr),
}
op := func(context.Context) error {
if len(errors) > 0 {
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestBQRetryerNoRetryBecauseOfCanceledContext(t *testing.T) {
DefaultRetryDelayMultiplier,
nil, // no error filter
)
pause, shouldRetry := retryer.Retry(errors.New("a test error"))
pause, shouldRetry := retryer.Retry(fmt.Errorf("retry: %w", testStaticErr))
assertEqual(t, false, shouldRetry)
assertEqual(t, time.Duration(0), pause)
}
Expand All @@ -122,14 +122,14 @@ func TestBQRetryerNoRetryBecauseOfMaxRetries(t *testing.T) {
)

// first time will work
pause, shouldRetry := retryer.Retry(errors.New("a test error"))
pause, shouldRetry := retryer.Retry(fmt.Errorf("retry: %w", testStaticErr))
assertEqual(t, true, shouldRetry)
if pause == 0 || pause > DefaultInitialRetryDelay {
t.Errorf("unexpeted pause duration: %v", pause)
}

// second time not, as we reached our limit of max retries
pause, shouldRetry = retryer.Retry(errors.New("a test error"))
pause, shouldRetry = retryer.Retry(fmt.Errorf("retry: %w", testStaticErr))
assertEqual(t, false, shouldRetry)
assertEqual(t, time.Duration(0), pause)
}
Expand All @@ -147,14 +147,14 @@ func TestBQRetryerNoRetryBecauseOfErrorFilter(t *testing.T) {
)

// first time will work, as the filter didn't trigger
pause, shouldRetry := retryer.Retry(errors.New("retry this test error please"))
pause, shouldRetry := retryer.Retry(fmt.Errorf("retry (should continue): %w", testStaticErr))
assertEqual(t, true, shouldRetry)
if pause == 0 || pause > DefaultInitialRetryDelay {
t.Errorf("unexpeted pause duration: %v", pause)
}

// second time not, as we triggered the error filter in the wrong way
pause, shouldRetry = retryer.Retry(errors.New("another test error, but please do not stop! :("))
pause, shouldRetry = retryer.Retry(fmt.Errorf("retry (should stop): %w", testStaticErr))
assertEqual(t, false, shouldRetry)
assertEqual(t, time.Duration(0), pause)
}
Expand All @@ -176,7 +176,7 @@ func TestBQGRPCRetryErrorFilterFalse(t *testing.T) {
// nil error is not an accepted error
assertEqual(t, false, bqGRPCRetryErrorFilter(nil))
// custom error is not an accepted error
assertEqual(t, false, bqGRPCRetryErrorFilter(errors.New("todo")))
assertEqual(t, false, bqGRPCRetryErrorFilter(fmt.Errorf("todo: %w", testStaticErr)))
// correct error, but wrong code
err := status.New(codes.Aborted, "test error").Err()
assertEqual(t, false, bqGRPCRetryErrorFilter(err))
Expand Down
14 changes: 14 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package bqwriter

import "errors"

// internal errors, as to not lock them in as part of the API,
// given these are errors not meant to be caught by users but really indicating a bug
var (
// invalidParamErr is returned in case we exit a function early with an error,
// due to an invalid parameter passed in by the callee.
invalidParamErr = errors.New("invalid function parameter")

// notSupportedErr is returned in case a feature is used that is not (yet) supported.
notSupportedErr = errors.New("not supported")
)
6 changes: 6 additions & 0 deletions errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package bqwriter

import "errors"

// testStaticErr is an error used for testing purposes only
var testStaticErr = errors.New("static test error")
16 changes: 8 additions & 8 deletions streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewStreamer(ctx context.Context, projectID, dataSetID, tableID string, cfg
ctx,
func(ctx context.Context, projectID, dataSetID, tableID string, logger Logger, insertAllCfg *InsertAllClientConfig, storageCfg *StorageClientConfig) (bqClient, error) {
if storageCfg != nil {
return nil, errors.New("create new streamer: storage client isn't supported yet")
return nil, fmt.Errorf("create new streamer: using storage client: %w", notSupportedErr)
}

return newStdBQInsertAllThickClient(
Expand All @@ -70,13 +70,13 @@ type clientBuilderFunc func(ctx context.Context, projectID, dataSetID, tableID s

func newStreamerWithClientBuilder(ctx context.Context, clientBuilder clientBuilderFunc, projectID, dataSetID, tableID string, cfg *StreamerConfig) (*Streamer, error) {
if projectID == "" {
return nil, errors.New("NewStreamerBuilder: projectID is empty: should be defined")
return nil, fmt.Errorf("streamer client creation: validate projectID: %w: missing", invalidParamErr)
}
if dataSetID == "" {
return nil, errors.New("NewStreamerBuilder: dataSetID is empty: should be defined")
return nil, fmt.Errorf("streamer client creation: validate dataSetID: %w: missing", invalidParamErr)
}
if tableID == "" {
return nil, errors.New("NewStreamerBuilder: tableID is empty: should be defined")
return nil, fmt.Errorf("streamer client creation: validate tableID: %w: missing", invalidParamErr)
}

// sanitize cfg
Expand Down Expand Up @@ -129,19 +129,19 @@ func newStreamerWithClientBuilder(ctx context.Context, clientBuilder clientBuild
// configured to do so.
func (s *Streamer) Write(data interface{}) error {
if data == nil {
return errors.New("Streamer::Write: data is nil: should be defined")
return fmt.Errorf("streamer client write: validate data: %w: nil data", invalidParamErr)
}
job := streamerJob{
Data: data,
}
if errors.Is(s.workerCtx.Err(), context.Canceled) {
return errors.New("write data into BQ streamer: streamer is already closed")
if err := s.workerCtx.Err(); errors.Is(err, context.Canceled) {
return fmt.Errorf("write data into BQ streamer: streamer worker context: %w", err)
}
select {
case s.workerCh <- job:
s.logger.Debug("inserted write job into bq streamer")
case <-s.workerCtx.Done():
return errors.New("write data into BQ streamer: worker is busy: streamer is already closed")
return fmt.Errorf("write data into BQ streamer: worker is busy: streamer worker context: %w", context.Canceled)
}
return nil
}
Expand Down
Loading

0 comments on commit 3177366

Please sign in to comment.