Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to latest openapi spec #66

Merged
merged 3 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 41 additions & 35 deletions collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,45 +78,45 @@ func (rc *RockClient) DeleteCollection(ctx context.Context, workspace, name stri
}

// CreateCollection is used to create a collection from one or more sources:
// - DynamoDB (see CreateDynamoDBIntegration())
// - GCS (see CreateGCSIntegration())
// - Kafka (see CreateKafkaIntegration())
// - Kinesis (see CreateKinesisIntegration())
// - MongoDB (see CreateMongoDBIntegration())
// - S3 (see CreateS3Integration())
// - DynamoDB (see CreateDynamoDBIntegration())
// - GCS (see CreateGCSIntegration())
// - Kafka (see CreateKafkaIntegration())
// - Kinesis (see CreateKinesisIntegration())
// - MongoDB (see CreateMongoDBIntegration())
// - S3 (see CreateS3Integration())
//
// It uses exponential backoff in case the API call is rate-limted.
//
// To create a collection from multiple sources, use:
// c, err := rc.CreateCollection(ctx, "commons", "example",
// option.WithCollectionDescription("created by go example code"),
// option.WithS3Source("s3-integration-name", "rockset-go-tests",
// option.WithCSVFormat(
// []string{"city", "country", "population", "visited"},
// []option.ColumnType{
// option.ColumnTypeString, option.ColumnTypeString, option.ColumnTypeInteger, option.ColumnTypeBool,
// },
// option.WithEncoding("UTF-8"),
// option.WithEscapeChar("\\"),
// option.WithQuoteChar(`"`),
// option.WithSeparator(","),
// ),
// option.WithS3Prefix("cities.csv"),
// ),
// option.WithKafkaSource("kafka-integration-name", "topic", option.KafkaStartingOffsetEarliest, option.WithJSONFormat(),
// option.WithKafkaSourceV3(),
// ),
// option.WithCollectionRetention(time.Hour),
// option.WithInsertOnly(),
// option.WithFieldMappingQuery("SELECT * FROM _input"),
// )
//
// c, err := rc.CreateCollection(ctx, "commons", "example",
// option.WithCollectionDescription("created by go example code"),
// option.WithS3Source("s3-integration-name", "rockset-go-tests",
// option.WithCSVFormat(
// []string{"city", "country", "population", "visited"},
// []option.ColumnType{
// option.ColumnTypeString, option.ColumnTypeString, option.ColumnTypeInteger, option.ColumnTypeBool,
// },
// option.WithEncoding("UTF-8"),
// option.WithEscapeChar("\\"),
// option.WithQuoteChar(`"`),
// option.WithSeparator(","),
// ),
// option.WithS3Prefix("cities.csv"),
// ),
// option.WithKafkaSource("kafka-integration-name", "topic", option.KafkaStartingOffsetEarliest, option.WithJSONFormat(),
// option.WithKafkaSourceV3(),
// ),
// option.WithCollectionRetention(time.Hour),
// option.WithFieldMappingQuery("SELECT * FROM _input"),
// )
func (rc *RockClient) CreateCollection(ctx context.Context, workspace, name string,
options ...option.CollectionOption) (openapi.Collection, error) {
var err error
var resp *openapi.CreateCollectionResponse

request := openapi.CreateCollectionRequest{}
request.Name = name
request.Name = &name

for _, o := range options {
o(&request)
Expand Down Expand Up @@ -165,7 +165,8 @@ func (rc *RockClient) CreateKinesisCollection(ctx context.Context,
var resp *openapi.CreateCollectionResponse

createReq := rc.CollectionsApi.CreateCollection(ctx, workspace)
createParams := openapi.NewCreateCollectionRequest(name)
createParams := openapi.NewCreateCollectionRequest()
createParams.Name = &name
createParams.Description = &description

f := openapi.FormatParams{}
Expand Down Expand Up @@ -205,7 +206,8 @@ func (rc *RockClient) CreateGCSCollection(ctx context.Context,
var resp *openapi.CreateCollectionResponse

createReq := rc.CollectionsApi.CreateCollection(ctx, workspace)
createParams := openapi.NewCreateCollectionRequest(name)
createParams := openapi.NewCreateCollectionRequest()
createParams.Name = &name
createParams.Description = &description

f := openapi.FormatParams{}
Expand Down Expand Up @@ -245,7 +247,8 @@ func (rc *RockClient) CreateDynamoDBCollection(ctx context.Context,
var resp *openapi.CreateCollectionResponse

createReq := rc.CollectionsApi.CreateCollection(ctx, workspace)
createParams := openapi.NewCreateCollectionRequest(name)
createParams := openapi.NewCreateCollectionRequest()
createParams.Name = &name
createParams.Description = &description

f := openapi.FormatParams{}
Expand Down Expand Up @@ -286,7 +289,8 @@ func (rc *RockClient) CreateFileUploadCollection(ctx context.Context,
var resp *openapi.CreateCollectionResponse

createReq := rc.CollectionsApi.CreateCollection(ctx, workspace)
createParams := openapi.NewCreateCollectionRequest(name)
createParams := openapi.NewCreateCollectionRequest()
createParams.Name = &name
createParams.Description = &description

f := openapi.FormatParams{}
Expand Down Expand Up @@ -340,7 +344,8 @@ func (rc *RockClient) CreateKafkaCollection(ctx context.Context, workspace, name
var resp *openapi.CreateCollectionResponse

createReq := rc.CollectionsApi.CreateCollection(ctx, workspace)
createParams := openapi.NewCreateCollectionRequest(name)
createParams := openapi.NewCreateCollectionRequest()
createParams.Name = &name

createParams.Sources = []openapi.Source{}

Expand All @@ -367,7 +372,8 @@ func (rc *RockClient) CreateMongoDBCollection(ctx context.Context,
var resp *openapi.CreateCollectionResponse

createReq := rc.CollectionsApi.CreateCollection(ctx, workspace)
createParams := openapi.NewCreateCollectionRequest(name)
createParams := openapi.NewCreateCollectionRequest()
createParams.Name = &name
createParams.Description = &description

f := openapi.FormatParams{}
Expand Down
1 change: 1 addition & 0 deletions collections_cc_kc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ConfluentCloudWithKafkaConnectIntegrationSuite struct {

// Test creating an integration and collection for Confluent Cloud with a local kafka-connect
func TestConfluentCloudWithKafkaConnectIntegrationSuite(t *testing.T) {
t.Skip("skipping kafka tests - too flakey :(")
skipUnlessIntegrationTest(t)
skipUnlessDocker(t)

Expand Down
1 change: 1 addition & 0 deletions collections_cc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ConfluentCloudIntegrationSuite struct {

// Test creating an integration and collection for Confluent Cloud
func TestConfluentCloudIntegrationSuite(t *testing.T) {
t.Skip("skipping kafka tests - too flakey :(")
skipUnlessIntegrationTest(t)

rc, err := rockset.NewClient()
Expand Down
3 changes: 2 additions & 1 deletion collections_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type KafkaIntegrationSuite struct {

// Test creating an integration and collection for a self-managed kafka with local kafka-connect
func TestKafkaIntegrationSuite(t *testing.T) {
t.Skip("skipping kafka tests - too flakey :(")
skipUnlessIntegrationTest(t)
skipUnlessDocker(t)

Expand Down Expand Up @@ -158,7 +159,7 @@ func (s *KafkaIntegrationSuite) SetupSuite() {

s.connect, err = s.dockerPool.RunWithOptions(&dockertest.RunOptions{
Repository: "rockset/kafka-connect",
Tag: "1.4.2-5",
Tag: "latest",
Hostname: "connect",
Env: environment(s.bootstrapServers, s.confluentKey, s.confluentSecret, option.KafkaFormatJSON),
PortBindings: map[docker.Port][]docker.PortBinding{
Expand Down
1 change: 0 additions & 1 deletion example_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func Example_s3() {
),
option.WithS3Prefix("cities.csv"),
),
option.WithInsertOnly(),
option.WithFieldMappingQuery("SELECT * FROM _input"),
)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/rs/zerolog v1.28.0
github.com/seborama/govcr v4.5.0+incompatible
github.com/stretchr/testify v1.8.0
golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094
golang.org/x/oauth2 v0.1.0
)

require (
Expand Down Expand Up @@ -42,8 +42,8 @@ require (
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458 // indirect
golang.org/x/sys v0.0.0-20221010170243-090e33056c14 // indirect
golang.org/x/net v0.1.0 // indirect
golang.org/x/sys v0.1.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,12 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458 h1:MgJ6t2zo8v0tbmLCueaCbF1RM+TtB0rs3Lv8DGtOIpY=
golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094 h1:2o1E+E8TpNLklK9nHiPiK1uzIYrIHt+cQx3ynCwq9V8=
golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg=
golang.org/x/oauth2 v0.1.0 h1:isLCZuhj4v+tYv7eskaN4v/TM+A1begWWgyVJDdl1+Y=
golang.org/x/oauth2 v0.1.0/go.mod h1:G9FE4dLTsbXUu90h/Pf85g4w1D+SSAgR+q46nJZ8M4A=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -274,8 +274,8 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14 h1:k5II8e6QD8mITdi+okbbmR/cIyEbeXLBhy5Ha4nevyc=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
Expand Down
30 changes: 0 additions & 30 deletions integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,36 +230,6 @@ func (rc *RockClient) CreateGCSIntegration(ctx context.Context, name, serviceAcc
return resp.GetData(), nil
}

func (rc *RockClient) CreateSegmentIntegration(ctx context.Context, name, connectionString string,
options ...option.SegmentIntegrationOption) (openapi.Integration, error) {
var err error
var resp *openapi.CreateIntegrationResponse

q := rc.IntegrationsApi.CreateIntegration(ctx)
req := openapi.NewCreateIntegrationRequest(name)

opts := option.SegmentIntegration{}
for _, o := range options {
o(&opts)
}

req.Segment.ConnectionString = &connectionString
if opts.Description != nil {
req.Description = opts.Description
}

err = rc.Retry(ctx, func() error {
resp, _, err = q.Body(*req).Execute()
return err
})

if err != nil {
return openapi.Integration{}, err
}

return resp.GetData(), nil
}

// CreateKafkaIntegration create a new integration for a Kafka source.
// If no format is specified, it defaults to JSON.
func (rc *RockClient) CreateKafkaIntegration(ctx context.Context, name string,
Expand Down
Loading