Skip to content

Commit

Permalink
refactor retry functinoality into a separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
pmenglund committed Oct 3, 2023
1 parent 2901573 commit 1420a2b
Show file tree
Hide file tree
Showing 22 changed files with 349 additions and 309 deletions.
12 changes: 6 additions & 6 deletions aliases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestAliasIntegrationSuite(t *testing.T) {
func (s *AliasIntegrationSuite) TestGetAlias() {
ctx := test.Context()

alias, err := s.rc.GetAlias(ctx, persistentWorkspace, persistentAlias)
alias, err := s.rc.GetAlias(ctx, test.PersistentWorkspace, test.PersistentAlias)
require.NoError(s.T(), err)
assert.Equal(s.T(), "[email protected]", alias.GetCreatorEmail())
}
Expand All @@ -45,7 +45,7 @@ func (s *AliasIntegrationSuite) TestListAliases() {
func (s *AliasIntegrationSuite) TestListAliasesForWorkspace() {
ctx := test.Context()

aliases, err := s.rc.ListAliases(ctx, option.WithAliasWorkspace(persistentWorkspace))
aliases, err := s.rc.ListAliases(ctx, option.WithAliasWorkspace(test.PersistentWorkspace))
require.NoError(s.T(), err)
for _, a := range aliases {
s.T().Logf("workspace: %s", a.GetName())
Expand All @@ -55,17 +55,17 @@ func (s *AliasIntegrationSuite) TestListAliasesForWorkspace() {
func (s *AliasIntegrationSuite) TestAliases() {
ctx := test.Context()

_, err := s.rc.CreateAlias(ctx, persistentWorkspace, s.alias, []string{"commons._events"})
_, err := s.rc.CreateAlias(ctx, test.PersistentWorkspace, s.alias, []string{"commons._events"})
require.NoError(s.T(), err)

err = s.rc.WaitUntilAliasAvailable(ctx, persistentWorkspace, s.alias)
err = s.rc.WaitUntilAliasAvailable(ctx, test.PersistentWorkspace, s.alias)
require.NoError(s.T(), err)

// update

err = s.rc.WaitUntilAliasAvailable(ctx, persistentWorkspace, s.alias)
err = s.rc.WaitUntilAliasAvailable(ctx, test.PersistentWorkspace, s.alias)
require.NoError(s.T(), err)

err = s.rc.DeleteAlias(ctx, persistentWorkspace, s.alias)
err = s.rc.DeleteAlias(ctx, test.PersistentWorkspace, s.alias)
require.NoError(s.T(), err)
}
2 changes: 1 addition & 1 deletion collections_cc_kc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestConfluentCloudWithKafkaConnectIntegrationSuite(t *testing.T) {
test.SkipUnlessIntegrationTest(t)
test.SkipUnlessDocker(t)

name := randomName("cckc")
name := test.RandomName("cckc")

s := ConfluentCloudWithKafkaConnectIntegrationSuite{
rc: test.Client(t),
Expand Down
10 changes: 5 additions & 5 deletions collections_cc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func TestConfluentCloudIntegrationSuite(t *testing.T) {

s := ConfluentCloudIntegrationSuite{
rc: test.Client(t),
integrationName: randomName("integration"),
ws: randomName("cc"),
coll: randomName("cc"),
integrationName: test.RandomName("integration"),
ws: test.RandomName("cc"),
coll: test.RandomName("cc"),
topic: "test_json",
bootstrapServers: test.SkipUnlessEnvSet(t, "CC_BOOTSTRAP_SERVERS"),
confluentKey: test.SkipUnlessEnvSet(t, "CC_KEY"),
Expand All @@ -50,7 +50,7 @@ func (s *ConfluentCloudIntegrationSuite) SetupSuite() {
s.Require().NoError(err)

_, err = s.rc.CreateKafkaIntegration(ctx, s.integrationName,
option.WithKafkaIntegrationDescription(description()),
option.WithKafkaIntegrationDescription(test.Description()),
option.WithKafkaV3(),
option.WithKafkaBootstrapServers(s.bootstrapServers),
option.WithKafkaSecurityConfig(s.confluentKey, s.confluentSecret),
Expand All @@ -75,7 +75,7 @@ func (s *ConfluentCloudIntegrationSuite) TestCreateJSONCollection() {
ctx := test.Context()

_, err := s.rc.CreateKafkaCollection(ctx, s.ws, s.coll,
option.WithCollectionDescription(description()),
option.WithCollectionDescription(test.Description()),
option.WithCollectionRetention(time.Hour),
option.WithKafkaSource(s.integrationName, s.topic, option.KafkaStartingOffsetEarliest, option.WithJSONFormat(),
option.WithKafkaSourceV3(),
Expand Down
6 changes: 3 additions & 3 deletions collections_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func TestKafkaIntegrationSuite(t *testing.T) {
rc: test.Client(t),
kc: kafkaConfig{
topic: "test_json",
integrationName: randomName("kafka"),
workspace: randomName("kafka"),
collection: randomName("kafka"),
integrationName: test.RandomName("kafka"),
workspace: test.RandomName("kafka"),
collection: test.RandomName("kafka"),
},
bootstrapServers: test.SkipUnlessEnvSet(t, "CC_BOOTSTRAP_SERVERS"),
confluentKey: test.SkipUnlessEnvSet(t, "CC_KEY"),
Expand Down
14 changes: 7 additions & 7 deletions collections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func (s *CollectionTestSuite) BeforeTest(suiteName, testName string) {
func (s *CollectionTestSuite) TestGetCollection() {
ctx := test.Context()

collection, err := s.rc.GetCollection(ctx, persistentWorkspace, persistentCollection)
collection, err := s.rc.GetCollection(ctx, test.PersistentWorkspace, test.PersistentCollection)
s.NoError(err)
s.Equal(persistentCollection, collection.GetName())
s.Equal(test.PersistentCollection, collection.GetName())
}

func (s *CollectionTestSuite) TestListAllCollections() {
Expand All @@ -78,15 +78,15 @@ func (s *CollectionTestSuite) TestListAllCollections() {
func (s *CollectionTestSuite) TestListCollectionsInWorkspace() {
ctx := test.Context()

collections, err := s.rc.ListCollections(ctx, option.WithWorkspace(persistentWorkspace))
collections, err := s.rc.ListCollections(ctx, option.WithWorkspace(test.PersistentWorkspace))
s.NoError(err)

s.T().Logf("collections in %s: %d", persistentWorkspace, len(collections))
s.T().Logf("collections in %s: %d", test.PersistentWorkspace, len(collections))
}

func (s *CollectionTestSuite) TestCreateSampleCitiesCollection() {
ctx := test.Context()
name := randomName("cities")
name := test.RandomName("cities")

_, err := s.rc.CreateCollection(ctx, s.ws, name,
option.WithSampleDataset(dataset.Cities))
Expand All @@ -99,7 +99,7 @@ func (s *CollectionTestSuite) TestCreateSampleCitiesCollection() {

func (s *CollectionTestSuite) TestCreateSampleMoviesCollection() {
ctx := test.Context()
name := randomName("movies")
name := test.RandomName("movies")

_, err := s.rc.CreateCollection(ctx, s.ws, name,
option.WithStorageCompressionType(option.StorageCompressionLZ4),
Expand All @@ -113,7 +113,7 @@ func (s *CollectionTestSuite) TestCreateSampleMoviesCollection() {

func (s *CollectionTestSuite) TestUpdateCollection() {
ctx := test.Context()
name := randomName("update")
name := test.RandomName("update")

_, err := s.rc.CreateCollection(ctx, s.ws, name,
option.WithStorageCompressionType(option.StorageCompressionZSTD),
Expand Down
4 changes: 2 additions & 2 deletions integrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *IntegrationTestSuite) TestCreateS3Integration() {

_, err := s.rc.CreateS3Integration(ctx, s.s3Integration,
option.AWSRole("arn:aws:iam::469279130686:role/rockset-s3-integration"),
option.WithS3IntegrationDescription(description()))
option.WithS3IntegrationDescription(test.Description()))
s.Require().NoError(err)

err = s.rc.DeleteIntegration(ctx, s.s3Integration)
Expand All @@ -73,7 +73,7 @@ func (s *IntegrationTestSuite) TestCreateGCSIntegration() {
ctx := test.Context()

_, err := s.rc.CreateGCSIntegration(ctx, s.gcsIntegration, saKeyFile,
option.WithGCSIntegrationDescription(description()))
option.WithGCSIntegrationDescription(test.Description()))
s.Require().NoError(err)

err = s.rc.DeleteIntegration(ctx, s.gcsIntegration)
Expand Down
6 changes: 6 additions & 0 deletions internal/test/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ func Client(t *testing.T) *rockset.RockClient {
require.NoError(t, err)
return rc
}

func DebugClient(t *testing.T) *rockset.RockClient {
rc, err := rockset.NewClient(rockset.WithUserAgent("rockset-go-integration-tests"), rockset.WithHTTPDebug())
require.NoError(t, err)
return rc
}
37 changes: 37 additions & 0 deletions internal/test/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package test

import (
"context"

"github.com/rockset/rockset-go-client/retry"
)

// Retrier is used for testing
type Retrier struct{}

func (t Retrier) Retry(ctx context.Context, retryFn retry.Func) error {
for {
err := retryFn()
if err == nil {
return nil
}
if !retry.DefaultRetryableErrorCheck(err) {
return err
}
}
}

func (t Retrier) RetryWithCheck(ctx context.Context, checkFunc retry.CheckFn) error {
for {
r, err := checkFunc()
if err == nil {
return nil
}
if !r {
return nil
}
if !retry.DefaultRetryableErrorCheck(err) {
return err
}
}
}
54 changes: 54 additions & 0 deletions internal/test/string.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package test

import (
"fmt"
"math/rand"
"os"
"strings"
"time"
)

// these are used for testing when a persistent value is needed
const buildNum = "CIRCLE_BUILD_NUM"
const PersistentWorkspace = "persistent"
const PersistentCollection = "snp"
const PersistentAlias = "getalias"

const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

var seededRand = rand.New(rand.NewSource(time.Now().UnixNano()))

// StringWithCharset creates a random string with length and charset
func stringWithCharset(length int, charset string) string {
b := make([]byte, length)
for i := range b {
b[i] = charset[seededRand.Intn(len(charset))]
}
return string(b)
}

// RandomString creates a random string with a specific length
func RandomString(length int) string {
return stringWithCharset(length, charset)
}

func RandomName(prefix string) string {
num, found := os.LookupEnv(buildNum)
if !found {
if user, found := os.LookupEnv("USER"); found {
num = strings.ToLower(user)
} else {
num = "dev"
}
}

return fmt.Sprintf("go_%s_%s_%s", num, prefix, RandomString(6))
}

func Description() string {
num, found := os.LookupEnv(buildNum)
if !found {
num = "dev"
}
return fmt.Sprintf("created by terraform integration test run %s", num)
}
5 changes: 3 additions & 2 deletions kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"github.com/rockset/rockset-go-client"
"github.com/rockset/rockset-go-client/internal/test"
"github.com/rockset/rockset-go-client/option"
"github.com/stretchr/testify/require"
"io"
Expand All @@ -26,7 +27,7 @@ type kafkaConfig struct {

func testKafka(ctx context.Context, t *testing.T, rc *rockset.RockClient, kc kafkaConfig) {
i, err := rc.CreateKafkaIntegration(ctx, kc.integrationName, option.WithKafkaDataFormat(option.KafkaFormatJSON),
option.WithKafkaIntegrationTopic(kc.topic), option.WithKafkaIntegrationDescription(description()))
option.WithKafkaIntegrationTopic(kc.topic), option.WithKafkaIntegrationDescription(test.Description()))
require.NoError(t, err)

u := fmt.Sprintf("https://%s", os.Getenv("ROCKSET_APISERVER"))
Expand Down Expand Up @@ -55,7 +56,7 @@ func testKafka(ctx context.Context, t *testing.T, rc *rockset.RockClient, kc kaf

_, err = rc.CreateKafkaCollection(ctx, kc.workspace, kc.collection,
option.WithKafkaSource(kc.integrationName, kc.topic, option.KafkaStartingOffsetEarliest, option.WithJSONFormat()),
option.WithCollectionDescription(description()))
option.WithCollectionDescription(test.Description()))
require.NoError(t, err)

t.Log("waiting for collection to start receiving documents...")
Expand Down
14 changes: 7 additions & 7 deletions query_lambdas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestRockClient_CreateQueryLambda(t *testing.T) {
name := randomName("ql")

ql, err := rc.CreateQueryLambda(ctx, ws, name, "SELECT 1",
option.WithDefaultParameter("", "", ""), option.WithQueryLambdaDescription(description()))
option.WithDefaultParameter("", "", ""), option.WithQueryLambdaDescription(test.Description()))
require.NoError(t, err)

defer func() {
Expand All @@ -44,7 +44,7 @@ func TestRockClient_GetQueryLambdaVersionByTag(t *testing.T) {
rc, _ := vcrTestClient(t, t.Name())
ctx := test.Context()

version, err := rc.GetQueryLambdaVersionByTag(ctx, persistentWorkspace, qlName, qlTag)
version, err := rc.GetQueryLambdaVersionByTag(ctx, test.PersistentWorkspace, qlName, qlTag)
require.NoError(t, err)
assert.Equal(t, qlVersion, version.Version.GetVersion())
}
Expand All @@ -53,7 +53,7 @@ func TestRockClient_GetQueryLambdaVersion(t *testing.T) {
rc, _ := vcrTestClient(t, t.Name())
ctx := test.Context()

version, err := rc.GetQueryLambdaVersion(ctx, persistentWorkspace, qlName, qlVersion)
version, err := rc.GetQueryLambdaVersion(ctx, test.PersistentWorkspace, qlName, qlVersion)
require.NoError(t, err)
assert.Equal(t, qlName, version.GetName())
}
Expand All @@ -74,19 +74,19 @@ func TestRockClient_ListQueryLambdas_workspace(t *testing.T) {
rc, _ := vcrTestClient(t, t.Name())
ctx := test.Context()

lambdas, err := rc.ListQueryLambdas(ctx, option.WithQueryLambdaWorkspace(persistentWorkspace))
lambdas, err := rc.ListQueryLambdas(ctx, option.WithQueryLambdaWorkspace(test.PersistentWorkspace))
require.NoError(t, err)

for _, l := range lambdas {
assert.Equal(t, persistentWorkspace, l.GetWorkspace())
assert.Equal(t, test.PersistentWorkspace, l.GetWorkspace())
}
}

func TestRockClient_ListQueryLambdaVersions(t *testing.T) {
rc, _ := vcrTestClient(t, t.Name())
ctx := test.Context()

versions, err := rc.ListQueryLambdaVersions(ctx, persistentWorkspace, qlName)
versions, err := rc.ListQueryLambdaVersions(ctx, test.PersistentWorkspace, qlName)
require.NoError(t, err)

for _, l := range versions {
Expand All @@ -98,7 +98,7 @@ func TestRockClient_ListQueryLambdaTags(t *testing.T) {
rc, _ := vcrTestClient(t, t.Name())
ctx := test.Context()

tags, err := rc.ListQueryLambdaTags(ctx, persistentWorkspace, qlName)
tags, err := rc.ListQueryLambdaTags(ctx, test.PersistentWorkspace, qlName)
require.NoError(t, err)

for _, tag := range tags {
Expand Down
Loading

0 comments on commit 1420a2b

Please sign in to comment.