From 5b4a21b4dbd1f5e04f7f8e610f7dacc1e408266d Mon Sep 17 00:00:00 2001 From: yoshi-code-bot <70984784+yoshi-code-bot@users.noreply.github.com> Date: Thu, 21 Jul 2022 15:21:18 -0700 Subject: [PATCH 1/6] chore: sync main to storage-refactor (#6386) * chore: release main (#6351) * test(profiler): compile busybench before running backoff test (#6375) * chore(bigquery/storage/managedwriter): augment test logging (#6373) * chore(storage): RewriteObject implementation (#6313) * chore(storage): RewriteObject implementation * address feedback * refactor source/destination object types * address feedback * address feedback * fix test * chore(main): release storage 1.24.0 (#6336) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> Co-authored-by: Chris Cotter * chore(internal/gapicgen): update microgen v0.31.2 (#6383) Only includes fixes to regapic generation. * test(bigquery/storage/managedwriter): relax error checking (#6385) When a user issues a large request, the response from the backend is a bare "InvalidArgument". This PR removes additional validation on information that is only attached when interrogating the backend from a known client; it's stripped in the normal case. Internal issue 239740070 was created to address the unactionable nature of the response. Fixes: https://github.com/googleapis/google-cloud-go/issues/6361 * feat(firestore): adds Bulkwriter support to Firestore client (#5946) * feat: adds Bulkwriter support to Firestore client * test(storage): unflake TestIntegration_ACL (#6392) Few minor changes to make sure potentially flaky and/or eventually consistent operations for ACLs are retried appropriately. Fixes #6379 Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> Co-authored-by: Amarin (Um) Phaosawasdi Co-authored-by: shollyman Co-authored-by: Noah Dietz Co-authored-by: Chris Cotter Co-authored-by: Eric Schmidt --- .release-please-manifest-submodules.json | 6 +- assuredworkloads/CHANGES.md | 8 + assuredworkloads/internal/version.go | 2 +- .../storage/managedwriter/integration_test.go | 13 +- dataplex/CHANGES.md | 7 + dataplex/internal/version.go | 2 +- firestore/bulkwriter.go | 330 ++++++++++++++++++ firestore/bulkwriter_test.go | 239 +++++++++++++ firestore/client.go | 12 + firestore/go.mod | 2 + firestore/go.sum | 3 + firestore/integration_test.go | 53 +++ firestore/mock_test.go | 8 + firestore/writebatch.go | 4 + internal/gapicgen/cmd/genbot/Dockerfile | 2 +- profiler/integration_test.go | 6 +- profiler/kokoro/integration_test.sh | 11 +- storage/.release-please-manifest.json | 2 +- storage/CHANGES.md | 7 + storage/client.go | 35 +- storage/client_test.go | 49 ++- storage/grpc_client.go | 51 ++- storage/http_client.go | 50 ++- storage/integration_test.go | 6 +- storage/internal/version.go | 2 +- storage/storage.go | 34 +- vmmigration/CHANGES.md | 7 + vmmigration/internal/version.go | 2 +- 28 files changed, 897 insertions(+), 56 deletions(-) create mode 100644 firestore/bulkwriter.go create mode 100644 firestore/bulkwriter_test.go diff --git a/.release-please-manifest-submodules.json b/.release-please-manifest-submodules.json index d1a56c25948b..2fcae63c2811 100644 --- a/.release-please-manifest-submodules.json +++ b/.release-please-manifest-submodules.json @@ -9,7 +9,7 @@ "area120": "0.4.0", "artifactregistry": "1.3.0", "asset": "1.3.0", - "assuredworkloads": "1.1.0", + "assuredworkloads": "1.2.0", "automl": "1.4.0", "baremetalsolution": "0.2.0", "batch": "0.1.0", @@ -30,7 +30,7 @@ "dataform": "0.1.0", "datafusion": "1.3.0", "datalabeling": "0.3.0", - "dataplex": "1.0.0", + "dataplex": "1.1.0", "dataproc": "1.5.0", "dataqna": "0.4.0", "datastream": "1.0.0", @@ -101,7 +101,7 @@ "video": "1.7.0", "videointelligence": "1.4.0", "vision/v2": "2.0.0", - "vmmigration": "1.0.0", + "vmmigration": "1.1.0", "vpcaccess": "1.2.0", "webrisk": "1.3.0", "websecurityscanner": "1.2.0", diff --git a/assuredworkloads/CHANGES.md b/assuredworkloads/CHANGES.md index 2404350609b6..6532a92194fb 100644 --- a/assuredworkloads/CHANGES.md +++ b/assuredworkloads/CHANGES.md @@ -1,6 +1,14 @@ # Changes +## [1.2.0](https://github.com/googleapis/google-cloud-go/compare/assuredworkloads/v1.1.0...assuredworkloads/v1.2.0) (2022-07-19) + + +### Features + +* **assuredworkloads:** Updated the method signature of analyzeWorkloadMove for v1beta API ([53246aa](https://github.com/googleapis/google-cloud-go/commit/53246aa18cb9c79471ecc84878b5e3f166086404)) +* **assuredworkloads:** Updated the method signature of analyzeWorkloadMove for v1beta API to accept project as source. AnalyzeWorkloadMove now also returns information about org policy differences between the project and target folder ([53246aa](https://github.com/googleapis/google-cloud-go/commit/53246aa18cb9c79471ecc84878b5e3f166086404)) + ## [1.1.0](https://github.com/googleapis/google-cloud-go/compare/assuredworkloads/v1.0.0...assuredworkloads/v1.1.0) (2022-07-12) diff --git a/assuredworkloads/internal/version.go b/assuredworkloads/internal/version.go index 6aea1adc121f..d0e4a6fd16b3 100644 --- a/assuredworkloads/internal/version.go +++ b/assuredworkloads/internal/version.go @@ -15,4 +15,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.1.0" +const Version = "1.2.0" diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index 4043d32bddad..e252c5bdb8fb 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "math" - "strings" "sync" "testing" "time" @@ -520,18 +519,10 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie if !ok { t.Errorf("GetResult error was not an instance of ApiError") } - if status := apiErr.GRPCStatus(); status.Code() != codes.InvalidArgument { + status := apiErr.GRPCStatus() + if status.Code() != codes.InvalidArgument { t.Errorf("expected InvalidArgument status, got %v", status) } - - details := apiErr.Details() - if details.DebugInfo == nil { - t.Errorf("expected DebugInfo to be populated, was nil") - } - wantSubstring := "Message size exceed the limitation of byte based flow control." - if detail := details.DebugInfo.GetDetail(); !strings.Contains(detail, wantSubstring) { - t.Errorf("detail missing desired substring: %s", detail) - } } // send a subsequent append as verification we can proceed. result, err = ms.AppendRows(ctx, [][]byte{b}) diff --git a/dataplex/CHANGES.md b/dataplex/CHANGES.md index ad2bd7a4db32..338eebd24e46 100644 --- a/dataplex/CHANGES.md +++ b/dataplex/CHANGES.md @@ -1,6 +1,13 @@ # Changes +## [1.1.0](https://github.com/googleapis/google-cloud-go/compare/dataplex/v1.0.0...dataplex/v1.1.0) (2022-07-19) + + +### Features + +* **dataplex:** Add IAM support for Explore content APIs feat: Add support for custom container for Task feat: Add support for cross project for Task feat: Add support for custom encryption key to be used for encrypt data on the PDs associated with the VMs in your Dataproc cluster for Task feat: Add support for Latest job in Task resource feat: User mode filter in Explore list sessions API feat: Support logging sampled file paths per partition to Cloud logging for Discovery event ([8b17366](https://github.com/googleapis/google-cloud-go/commit/8b17366c46bbd8a0b2adf39ec3b058eb83192933)) + ## [1.0.0](https://github.com/googleapis/google-cloud-go/compare/dataplex/v0.4.0...dataplex/v1.0.0) (2022-06-29) diff --git a/dataplex/internal/version.go b/dataplex/internal/version.go index db6d2e3e99d1..6aea1adc121f 100644 --- a/dataplex/internal/version.go +++ b/dataplex/internal/version.go @@ -15,4 +15,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.0.0" +const Version = "1.1.0" diff --git a/firestore/bulkwriter.go b/firestore/bulkwriter.go new file mode 100644 index 000000000000..be35be8614c6 --- /dev/null +++ b/firestore/bulkwriter.go @@ -0,0 +1,330 @@ +package firestore + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + vkit "cloud.google.com/go/firestore/apiv1" + "golang.org/x/time/rate" + "google.golang.org/api/support/bundler" + pb "google.golang.org/genproto/googleapis/firestore/v1" +) + +const ( + // maxBatchSize is the max number of writes to send in a request + maxBatchSize = 20 + // maxRetryAttempts is the max number of times to retry a write + maxRetryAttempts = 10 + // defaultStartingMaximumOpsPerSecond is the starting max number of requests to the service per second + defaultStartingMaximumOpsPerSecond = 500 + // maxWritesPerSecond is the starting limit of writes allowed to callers per second + maxWritesPerSecond = maxBatchSize * defaultStartingMaximumOpsPerSecond +) + +// bulkWriterResult contains the WriteResult or error results from an individual +// write to the database. +type bulkWriterResult struct { + result *pb.WriteResult // (cached) result from the operation + err error // (cached) any errors that occurred +} + +// BulkWriterJob provides read-only access to the results of a BulkWriter write attempt. +type BulkWriterJob struct { + resultChan chan bulkWriterResult // send errors and results to this channel + write *pb.Write // the writes to apply to the database + attempts int // number of times this write has been attempted + resultsLock sync.Mutex // guards the cached wr and e values for the job + result *WriteResult // (cached) result from the operation + err error // (cached) any errors that occurred + ctx context.Context // context for canceling/timing out results +} + +// Results gets the results of the BulkWriter write attempt. +// This method blocks if the results for this BulkWriterJob haven't been +// received. +func (j *BulkWriterJob) Results() (*WriteResult, error) { + j.resultsLock.Lock() + defer j.resultsLock.Unlock() + if j.result == nil && j.err == nil { + j.result, j.err = j.processResults() // cache the results for additional calls + } + return j.result, j.err +} + +// processResults checks for errors returned from send() and packages up the +// results as WriteResult objects +func (j *BulkWriterJob) processResults() (*WriteResult, error) { + select { + case <-j.ctx.Done(): + return nil, j.ctx.Err() + case bwr := <-j.resultChan: + if bwr.err != nil { + return nil, bwr.err + } + return writeResultFromProto(bwr.result) + } +} + +// setError ensures that an error is returned on the error channel of BulkWriterJob. +func (j *BulkWriterJob) setError(e error) { + bwr := bulkWriterResult{ + err: e, + result: nil, + } + j.resultChan <- bwr + close(j.resultChan) +} + +// A BulkWriter supports concurrent writes to multiple documents. The BulkWriter +// submits document writes in maximum batches of 20 writes per request. Each +// request can contain many different document writes: create, delete, update, +// and set are all supported. +// +// Only one operation (create, set, update, delete) per document is allowed. +// BulkWriter cannot promise atomicity: individual writes can fail or succeed +// independent of each other. Bulkwriter does not apply writes in any set order; +// thus a document can't have set on it immediately after creation. +type BulkWriter struct { + database string // the database as resource name: projects/[PROJECT]/databases/[DATABASE] + start time.Time // when this BulkWriter was started; used to calculate qps and rate increases + vc *vkit.Client // internal client + maxOpsPerSecond int // number of requests that can be sent per second + docUpdatePaths map[string]bool // document paths with corresponding writes in the queue + limiter rate.Limiter // limit requests to server to <= 500 qps + bundler *bundler.Bundler // handle bundling up writes to Firestore + ctx context.Context // context for canceling all BulkWriter operations + isOpenLock sync.RWMutex // guards against setting isOpen concurrently + isOpen bool // flag that the BulkWriter is closed +} + +// newBulkWriter creates a new instance of the BulkWriter. +func newBulkWriter(ctx context.Context, c *Client, database string) *BulkWriter { + // Although typically we shouldn't store Context objects, in this case we + // need to pass this Context through to the Bundler handler. + ctx = withResourceHeader(ctx, c.path()) + + bw := &BulkWriter{ + database: database, + start: time.Now(), + vc: c.c, + isOpen: true, + maxOpsPerSecond: defaultStartingMaximumOpsPerSecond, + docUpdatePaths: make(map[string]bool), + ctx: ctx, + limiter: *rate.NewLimiter(rate.Limit(maxWritesPerSecond), 1), + } + + // can't initialize within struct above; need instance reference to BulkWriter.send() + bw.bundler = bundler.NewBundler(&BulkWriterJob{}, bw.send) + bw.bundler.HandlerLimit = bw.maxOpsPerSecond + bw.bundler.BundleCountThreshold = maxBatchSize + + return bw +} + +// End sends all enqueued writes in parallel and closes the BulkWriter to new requests. +// After calling End(), calling any additional method automatically returns +// with an error. This method completes when there are no more pending writes +// in the queue. +func (bw *BulkWriter) End() { + bw.isOpenLock.Lock() + bw.isOpen = false + bw.isOpenLock.Unlock() + bw.Flush() +} + +// Flush commits all writes that have been enqueued up to this point in parallel. +// This method blocks execution. +func (bw *BulkWriter) Flush() { + bw.bundler.Flush() +} + +// Create adds a document creation write to the queue of writes to send. +// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once. +func (bw *BulkWriter) Create(doc *DocumentRef, datum interface{}) (*BulkWriterJob, error) { + bw.isOpenLock.RLock() + defer bw.isOpenLock.RUnlock() + err := bw.checkWriteConditions(doc) + if err != nil { + return nil, err + } + + w, err := doc.newCreateWrites(datum) + if err != nil { + return nil, fmt.Errorf("firestore: cannot create %v with %v", doc.ID, datum) + } + + if len(w) > 1 { + return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter") + } + + j := bw.write(w[0]) + return j, nil +} + +// Delete adds a document deletion write to the queue of writes to send. +// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once. +func (bw *BulkWriter) Delete(doc *DocumentRef, preconds ...Precondition) (*BulkWriterJob, error) { + bw.isOpenLock.RLock() + defer bw.isOpenLock.RUnlock() + err := bw.checkWriteConditions(doc) + if err != nil { + return nil, err + } + + w, err := doc.newDeleteWrites(preconds) + if err != nil { + return nil, fmt.Errorf("firestore: cannot delete doc %v", doc.ID) + } + + if len(w) > 1 { + return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter") + } + + j := bw.write(w[0]) + return j, nil +} + +// Set adds a document set write to the queue of writes to send. +// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once. +func (bw *BulkWriter) Set(doc *DocumentRef, datum interface{}, opts ...SetOption) (*BulkWriterJob, error) { + bw.isOpenLock.RLock() + defer bw.isOpenLock.RUnlock() + err := bw.checkWriteConditions(doc) + if err != nil { + return nil, err + } + + w, err := doc.newSetWrites(datum, opts) + if err != nil { + return nil, fmt.Errorf("firestore: cannot set %v on doc %v", datum, doc.ID) + } + + if len(w) > 1 { + return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter") + } + + j := bw.write(w[0]) + return j, nil +} + +// Update adds a document update write to the queue of writes to send. +// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once. +func (bw *BulkWriter) Update(doc *DocumentRef, updates []Update, preconds ...Precondition) (*BulkWriterJob, error) { + bw.isOpenLock.RLock() + defer bw.isOpenLock.RUnlock() + err := bw.checkWriteConditions(doc) + if err != nil { + return nil, err + } + + w, err := doc.newUpdatePathWrites(updates, preconds) + if err != nil { + return nil, fmt.Errorf("firestore: cannot update doc %v", doc.ID) + } + + if len(w) > 1 { + return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter") + } + + j := bw.write(w[0]) + return j, nil +} + +// checkConditions determines whether this write attempt is valid. It returns +// an error if either the BulkWriter has already been closed or if it +// receives a nil document reference. +func (bw *BulkWriter) checkWriteConditions(doc *DocumentRef) error { + if !bw.isOpen { + return errors.New("firestore: BulkWriter has been closed") + } + + if doc == nil { + return errors.New("firestore: nil document contents") + } + + _, havePath := bw.docUpdatePaths[doc.shortPath] + if havePath { + return fmt.Errorf("firestore: BulkWriter received duplicate write for path: %v", doc.shortPath) + } + + bw.docUpdatePaths[doc.shortPath] = true + + return nil +} + +// write packages up write requests into bulkWriterJob objects. +func (bw *BulkWriter) write(w *pb.Write) *BulkWriterJob { + + j := &BulkWriterJob{ + resultChan: make(chan bulkWriterResult, 1), + write: w, + ctx: bw.ctx, + } + + bw.limiter.Wait(bw.ctx) + // ignore operation size constraints and related errors; can't be inferred at compile time + // Bundler is set to accept an unlimited amount of bytes + _ = bw.bundler.Add(j, 0) + + return j +} + +// send transmits writes to the service and matches response results to job channels. +func (bw *BulkWriter) send(i interface{}) { + bwj := i.([]*BulkWriterJob) + + if len(bwj) == 0 { + return + } + + var ws []*pb.Write + for _, w := range bwj { + ws = append(ws, w.write) + } + + bwr := &pb.BatchWriteRequest{ + Database: bw.database, + Writes: ws, + Labels: map[string]string{}, + } + + select { + case <-bw.ctx.Done(): + return + default: + resp, err := bw.vc.BatchWrite(bw.ctx, bwr) + if err != nil { + // Do we need to be selective about what kind of errors we send? + for _, j := range bwj { + j.setError(err) + } + return + } + // Match write results with BulkWriterJob objects + for i, res := range resp.WriteResults { + s := resp.Status[i] + c := s.GetCode() + if c != 0 { // Should we do an explicit check against rpc.Code enum? + j := bwj[i] + j.attempts++ + + // Do we need separate retry bundler? + if j.attempts < maxRetryAttempts { + // ignore operation size constraints and related errors; job size can't be inferred at compile time + // Bundler is set to accept an unlimited amount of bytes + _ = bw.bundler.Add(j, 0) + } else { + j.setError(fmt.Errorf("firestore: write failed with status: %v", s)) + } + continue + } + + bwj[i].resultChan <- bulkWriterResult{err: nil, result: res} + close(bwj[i].resultChan) + } + } +} diff --git a/firestore/bulkwriter_test.go b/firestore/bulkwriter_test.go new file mode 100644 index 000000000000..3349e3b7089e --- /dev/null +++ b/firestore/bulkwriter_test.go @@ -0,0 +1,239 @@ +package firestore + +import ( + "context" + "testing" + + pb "google.golang.org/genproto/googleapis/firestore/v1" + "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" +) + +type bulkwriterTestCase struct { + name string + test func(*BulkWriter) (*BulkWriterJob, error) +} + +// setupMockServer adds expected write requests and correct mocked responses +// to the mockServer. +func setupMockServer(c *Client, docPrefix string, srv *mockServer) { + // Create + srv.addRPC( + &pb.BatchWriteRequest{ + Database: c.path(), + Writes: []*pb.Write{ + { + Operation: &pb.Write_Update{ + Update: &pb.Document{ + Name: docPrefix + "a", + Fields: testFields, + }, + }, + CurrentDocument: &pb.Precondition{ + ConditionType: &pb.Precondition_Exists{ + Exists: false, + }, + }, + }, + }, + }, + &pb.BatchWriteResponse{ + WriteResults: []*pb.WriteResult{ + {UpdateTime: aTimestamp}, + }, + Status: []*status.Status{ + { + Code: int32(codes.OK), + Message: "create test successful", + }, + }, + }, + ) + + // Set + srv.addRPC( + &pb.BatchWriteRequest{ + Database: c.path(), + Writes: []*pb.Write{ + { + Operation: &pb.Write_Update{ + Update: &pb.Document{ + Name: docPrefix + "b", + Fields: testFields, + }, + }, + }, + }, + }, + &pb.BatchWriteResponse{ + WriteResults: []*pb.WriteResult{ + {UpdateTime: aTimestamp2}, + }, + Status: []*status.Status{ + { + Code: int32(codes.OK), + Message: "set test successful", + }, + }, + }, + ) + + // Delete + srv.addRPC( + &pb.BatchWriteRequest{ + Database: c.path(), + Writes: []*pb.Write{ + { + Operation: &pb.Write_Delete{ + Delete: docPrefix + "c", + }, + }, + }, + }, + &pb.BatchWriteResponse{ + WriteResults: []*pb.WriteResult{ + {UpdateTime: aTimestamp3}, + }, + Status: []*status.Status{ + { + Code: int32(codes.OK), + Message: "delete test successful", + }, + }, + }, + ) + + // Update + srv.addRPC( + &pb.BatchWriteRequest{ + Database: c.path(), + Writes: []*pb.Write{ + { + Operation: &pb.Write_Update{ + Update: &pb.Document{ + Name: docPrefix + "f", + Fields: map[string]*pb.Value{"*": intval(3)}, + }, + }, + UpdateMask: &pb.DocumentMask{FieldPaths: []string{"`*`"}}, + CurrentDocument: &pb.Precondition{ + ConditionType: &pb.Precondition_Exists{ + Exists: true, + }, + }, + }, + }, + }, + &pb.BatchWriteResponse{ + WriteResults: []*pb.WriteResult{ + {UpdateTime: aTimestamp3}, + }, + Status: []*status.Status{ + { + Code: int32(codes.OK), + Message: "update test successful", + }, + }, + }, + ) +} + +func TestBulkWriter(t *testing.T) { + c, srv, cleanup := newMock(t) + defer cleanup() + + docPrefix := c.Collection("C").Path + "/" + + setupMockServer(c, docPrefix, srv) + + ctx := context.Background() + bw := c.BulkWriter(ctx) + wantWRs := []*WriteResult{{aTime}, {aTime2}, {aTime3}, {aTime3}} + tcs := []bulkwriterTestCase{ + { + name: "Create()", + test: func(b *BulkWriter) (*BulkWriterJob, error) { + return b.Create(c.Doc("C/a"), testData) + }, + }, + { + name: "Set()", + test: func(b *BulkWriter) (*BulkWriterJob, error) { return b.Set(c.Doc("C/b"), testData) }, + }, + { + name: "Delete()", + test: func(b *BulkWriter) (*BulkWriterJob, error) { + return b.Delete(c.Doc("C/c")) + }, + }, + { + name: "Update()", + test: func(b *BulkWriter) (*BulkWriterJob, error) { + return b.Update(c.Doc("C/f"), []Update{{FieldPath: []string{"*"}, Value: 3}}) + }, + }, + } + + for i, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + j, err := tc.test(bw) + if err != nil { + t.Errorf("bulkwriter: cannot call %s for document\n", tc.name) + } + if j == nil { + t.Fatalf("bulkwriter: got nil WriteResult for call to %s\n", tc.name) + } + + bw.Flush() + + wr, err := j.Results() + if err != nil { + t.Errorf("bulkwriter:\nwanted %v,\n, got error: %v", wantWRs[i], err) + } + + if !testEqual(wr, wantWRs[i]) { + t.Errorf("bulkwriter:\nwanted %v,\n got %v\n", wantWRs[i], wr) + } + }) + } +} + +func TestBulkWriterErrors(t *testing.T) { + c, _, cleanup := newMock(t) + defer cleanup() + ctx, cancel := context.WithCancel(context.Background()) + b := c.BulkWriter(ctx) + + tcs := []bulkwriterTestCase{ + { + name: "empty document reference", + test: func(b *BulkWriter) (*BulkWriterJob, error) { + return b.Delete(nil) + }, + }, + { + name: "cannot write to same document twice", + test: func(b *BulkWriter) (*BulkWriterJob, error) { + b.Create(c.Doc("C/a"), testData) + return b.Delete(c.Doc("C/a")) + }, + }, + { + name: "cannot ask a closed BulkWriter to write", + test: func(b *BulkWriter) (*BulkWriterJob, error) { + cancel() + b.End() + return b.Delete(c.Doc("C/b")) + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + _, err := tc.test(b) + if err == nil { + t.Fatalf("wanted error, got nil value") + } + }) + } +} diff --git a/firestore/client.go b/firestore/client.go index 35d695a3e749..3da59ede72d7 100644 --- a/firestore/client.go +++ b/firestore/client.go @@ -290,10 +290,22 @@ func (c *Client) Collections(ctx context.Context) *CollectionIterator { } // Batch returns a WriteBatch. +// +// Deprecated: The WriteBatch API has been replaced with the transaction and +// the bulk writer API. For atomic transaction operations, use `Transaction`. +// For bulk read and write operations, use `BulkWriter`. func (c *Client) Batch() *WriteBatch { return &WriteBatch{c: c} } +// BulkWriter returns a BulkWriter instance. +// The context passed to the BulkWriter remains stored through the lifecycle +// of the object. This context allows callers to cancel BulkWriter operations. +func (c *Client) BulkWriter(ctx context.Context) *BulkWriter { + bw := newBulkWriter(ctx, c, c.path()) + return bw +} + // commit calls the Commit RPC outside of a transaction. func (c *Client) commit(ctx context.Context, ws []*pb.Write) (_ []*WriteResult, err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.commit") diff --git a/firestore/go.mod b/firestore/go.mod index 49698429ce06..6e3b7ee32e47 100644 --- a/firestore/go.mod +++ b/firestore/go.mod @@ -7,6 +7,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.8 github.com/googleapis/gax-go/v2 v2.4.0 + golang.org/x/time v0.0.0-20220609170525-579cf78fd858 google.golang.org/api v0.85.0 google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad google.golang.org/grpc v1.47.0 @@ -20,6 +21,7 @@ require ( go.opencensus.io v0.23.0 // indirect golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 // indirect golang.org/x/oauth2 v0.0.0-20220608161450-d0670ef3b1eb // indirect + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect diff --git a/firestore/go.sum b/firestore/go.sum index b883b68a4ce5..7192a25bbe07 100644 --- a/firestore/go.sum +++ b/firestore/go.sum @@ -324,6 +324,7 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -400,6 +401,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U= +golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/firestore/integration_test.go b/firestore/integration_test.go index dfd769845b91..40598e6df1c2 100644 --- a/firestore/integration_test.go +++ b/firestore/integration_test.go @@ -1720,3 +1720,56 @@ func TestIntegration_ColGroupRefPartitionsLarge(t *testing.T) { t.Errorf("Unexpected number of documents across partitions: got %d, want %d", got, want) } } + +func TestIntegration_BulkWriter(t *testing.T) { + doc := iColl.NewDoc() + c := integrationClient(t) + ctx := context.Background() + bw := c.BulkWriter(ctx) + + f := integrationTestMap + j, err := bw.Create(doc, f) + + if err != nil { + t.Errorf("bulkwriter: error creating write to database: %v\n", err) + } + + bw.Flush() // This blocks + res, err := j.Results() // so does this + + if err != nil { + t.Errorf("bulkwriter: error getting write results: %v\n", err) + } + + if res == nil { + t.Error("bulkwriter: write attempt returned nil results") + } + + numNewWrites := 21 // 20 is the threshold at which the bundler should start sending requests + var jobs []*BulkWriterJob + + // Test a slew of writes sent at the BulkWriter + for i := 0; i < numNewWrites; i++ { + d := iColl.NewDoc() + jb, err := bw.Create(d, f) + + if err != nil { + t.Errorf("bulkwriter: error creating write to database: %v\n", err) + } + + jobs = append(jobs, jb) + } + + bw.End() // This calls Flush() in the background. + + for _, j := range jobs { + res, err = j.Results() + if err != nil { + t.Errorf("bulkwriter: error getting write results: %v\n", err) + } + + if res == nil { + t.Error("bulkwriter: write attempt returned nil results") + } + } +} diff --git a/firestore/mock_test.go b/firestore/mock_test.go index 8b333744e8dd..ef40a92953b3 100644 --- a/firestore/mock_test.go +++ b/firestore/mock_test.go @@ -227,3 +227,11 @@ func (s *mockServer) Listen(stream pb.Firestore_ListenServer) error { } return nil } + +func (s *mockServer) BatchWrite(_ context.Context, req *pb.BatchWriteRequest) (*pb.BatchWriteResponse, error) { + res, err := s.popRPC(req) + if err != nil { + return nil, err + } + return res.(*pb.BatchWriteResponse), nil +} diff --git a/firestore/writebatch.go b/firestore/writebatch.go index aaa035738d13..e1de0760ac37 100644 --- a/firestore/writebatch.go +++ b/firestore/writebatch.go @@ -26,6 +26,10 @@ import ( // Update and Delete methods, then run it with the Commit method. Errors in Create, // Set, Update or Delete are recorded instead of being returned immediately. The // first such error is returned by Commit. +// +// Deprecated: The WriteBatch API has been replaced with the transaction and +// the bulk writer API. For atomic transaction operations, use `Transaction`. +// For bulk read and write operations, use `BulkWriter`. type WriteBatch struct { c *Client err error diff --git a/internal/gapicgen/cmd/genbot/Dockerfile b/internal/gapicgen/cmd/genbot/Dockerfile index 9f1261f16328..cb929d12b84a 100644 --- a/internal/gapicgen/cmd/genbot/Dockerfile +++ b/internal/gapicgen/cmd/genbot/Dockerfile @@ -27,7 +27,7 @@ RUN go install github.com/golang/protobuf/protoc-gen-go@v1.5.2 && \ go install golang.org/x/lint/golint@latest && \ go install golang.org/x/tools/cmd/goimports@latest && \ go install honnef.co/go/tools/cmd/staticcheck@latest && \ - go install github.com/googleapis/gapic-generator-go/cmd/protoc-gen-go_gapic@v0.31.0 + go install github.com/googleapis/gapic-generator-go/cmd/protoc-gen-go_gapic@v0.31.2 ENV PATH="${PATH}:/root/go/bin" # Source: http://debuggable.com/posts/disable-strict-host-checking-for-git-clone:49896ff3-0ac0-4263-9703-1eae4834cda3 diff --git a/profiler/integration_test.go b/profiler/integration_test.go index cca1ed590388..e0a84db1ecef 100644 --- a/profiler/integration_test.go +++ b/profiler/integration_test.go @@ -118,13 +118,17 @@ echo "{{.FinishString}}" {{ define "integration_backoff" -}} {{- template "prologue" . }} {{- template "setup" . }} + +# Compile first so each spawned process can just use the same binary. +go build busybench.go + # Do not display commands being run to simplify logging output. set +x # Run benchmarks with agent. echo "Starting {{.NumBackoffBenchmarks}} benchmarks." for (( i = 0; i < {{.NumBackoffBenchmarks}}; i++ )); do - (go run busybench.go --service="{{.Service}}" --duration={{.DurationSec}} \ + (./busybench --service="{{.Service}}" --duration={{.DurationSec}} \ --num_busyworkers=1) |& while read line; \ do echo "benchmark $i: ${line}"; done & done diff --git a/profiler/kokoro/integration_test.sh b/profiler/kokoro/integration_test.sh index 8a655dc23316..8d532de09160 100755 --- a/profiler/kokoro/integration_test.sh +++ b/profiler/kokoro/integration_test.sh @@ -44,14 +44,9 @@ export GIMME_GO_VERSION=1.18.4 export GIMME_ENV_PREFIX=/tmp/gimme_envs install_go() { "$GIMME" - # If gimme was successful, an .env script that sets up proper environment - # variables will be created. - if [[ -f "${GIMME_ENV_PREFIX}/go${GIMME_GO_VERSION}.env" ]] - then - source "${GIMME_ENV_PREFIX}/go${GIMME_GO_VERSION}.env" - else - return 1 - fi + # If gimme fails, this file will not exists, source will fail, and install_go + # will be retried. + source "${GIMME_ENV_PREFIX}/go${GIMME_GO_VERSION}.env" } retry install_go diff --git a/storage/.release-please-manifest.json b/storage/.release-please-manifest.json index 110758e0c145..7e1a2feb7072 100644 --- a/storage/.release-please-manifest.json +++ b/storage/.release-please-manifest.json @@ -1,3 +1,3 @@ { - "storage": "1.23.0" + "storage": "1.24.0" } \ No newline at end of file diff --git a/storage/CHANGES.md b/storage/CHANGES.md index 4a80b8ff4cfe..75b3f62deb89 100644 --- a/storage/CHANGES.md +++ b/storage/CHANGES.md @@ -1,6 +1,13 @@ # Changes +## [1.24.0](https://github.com/googleapis/google-cloud-go/compare/storage/v1.23.0...storage/v1.24.0) (2022-07-20) + + +### Features + +* **storage:** add Custom Placement Config Dual Region Support ([#6294](https://github.com/googleapis/google-cloud-go/issues/6294)) ([5a8c607](https://github.com/googleapis/google-cloud-go/commit/5a8c607e3a9a3265887e27cb13f8943f3e3fa23d)) + ## [1.23.0](https://github.com/googleapis/google-cloud-go/compare/storage/v1.22.1...storage/v1.23.0) (2022-06-23) diff --git a/storage/client.go b/storage/client.go index 7117d5820d79..ef443b755853 100644 --- a/storage/client.go +++ b/storage/client.go @@ -278,34 +278,33 @@ type newRangeReaderParams struct { type composeObjectRequest struct { dstBucket string - dstObject composeDstObject - srcs []composeSrcObject + dstObject destinationObject + srcs []sourceObject predefinedACL string encryptionKey []byte sendCRC32C bool } -type composeSrcObject struct { - name string - gen int64 - conds *Conditions +type sourceObject struct { + name string + bucket string + gen int64 + conds *Conditions + encryptionKey []byte } -type composeDstObject struct { - name string - conds *Conditions - attrs *ObjectAttrs // attrs to set on the destination object. +type destinationObject struct { + name string + bucket string + conds *Conditions + attrs *ObjectAttrs // attrs to set on the destination object. + encryptionKey []byte + keyName string } type rewriteObjectRequest struct { - srcBucket string - srcObject string - dstBucket string - dstObject string - dstKeyName string - attrs *ObjectAttrs - gen int64 - conds *Conditions + srcObject sourceObject + dstObject destinationObject predefinedACL string token string } diff --git a/storage/client_test.go b/storage/client_test.go index cb667e30f037..a32d5b9ef007 100644 --- a/storage/client_test.go +++ b/storage/client_test.go @@ -276,6 +276,51 @@ func TestGetObjectEmulated(t *testing.T) { }) } +func TestRewriteObjectEmulated(t *testing.T) { + transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { + // Populate test object. + _, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ + Name: bucket, + }) + if err != nil { + t.Fatalf("client.CreateBucket: %v", err) + } + src := ObjectAttrs{ + Bucket: bucket, + Name: fmt.Sprintf("testObject-%d", time.Now().Nanosecond()), + } + w := veneerClient.Bucket(bucket).Object(src.Name).NewWriter(context.Background()) + if _, err := w.Write(randomBytesToWrite); err != nil { + t.Fatalf("failed to populate test object: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("closing object: %v", err) + } + req := &rewriteObjectRequest{ + dstObject: destinationObject{ + bucket: bucket, + name: fmt.Sprintf("copy-of-%s", src.Name), + attrs: &ObjectAttrs{}, + }, + srcObject: sourceObject{ + bucket: bucket, + name: src.Name, + gen: defaultGen, + }, + } + got, err := client.RewriteObject(context.Background(), req) + if err != nil { + t.Fatal(err) + } + if !got.done { + t.Fatal("didn't finish writing!") + } + if want := int64(len(randomBytesToWrite)); got.written != want { + t.Errorf("Bytes written: got %d, want %d", got.written, want) + } + }) +} + func TestUpdateObjectEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test object. @@ -986,11 +1031,11 @@ func TestComposeEmulated(t *testing.T) { dstName := fmt.Sprintf("%d-object3", prefix) req := composeObjectRequest{ dstBucket: bucket, - dstObject: composeDstObject{ + dstObject: destinationObject{ name: dstName, attrs: &ObjectAttrs{StorageClass: "COLDLINE"}, }, - srcs: []composeSrcObject{ + srcs: []sourceObject{ {name: srcNames[0]}, {name: srcNames[1]}, }, diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 4bf8f6baefb2..110baebb86fb 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -766,7 +766,56 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec return newObjectFromProto(obj), nil } func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) { - return nil, errMethodNotSupported + s := callSettings(c.settings, opts...) + obj := req.dstObject.attrs.toProtoObject("") + call := &storagepb.RewriteObjectRequest{ + SourceBucket: bucketResourceName(globalProjectAlias, req.srcObject.bucket), + SourceObject: req.srcObject.name, + RewriteToken: req.token, + DestinationBucket: bucketResourceName(globalProjectAlias, req.dstObject.bucket), + DestinationName: req.dstObject.name, + Destination: obj, + DestinationKmsKey: req.dstObject.keyName, + DestinationPredefinedAcl: req.predefinedACL, + } + + // The userProject, whether source or destination project, is decided by the code calling the interface. + if s.userProject != "" { + ctx = setUserProjectMetadata(ctx, s.userProject) + } + if err := applyCondsProto("Copy destination", defaultGen, req.dstObject.conds, call); err != nil { + return nil, err + } + if err := applySourceCondsProto(req.srcObject.gen, req.srcObject.conds, call); err != nil { + return nil, err + } + + if len(req.dstObject.encryptionKey) > 0 { + call.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey) + } + if len(req.srcObject.encryptionKey) > 0 { + srcParams := toProtoCommonObjectRequestParams(req.srcObject.encryptionKey) + call.CopySourceEncryptionAlgorithm = srcParams.GetEncryptionAlgorithm() + call.CopySourceEncryptionKeyBytes = srcParams.GetEncryptionKeyBytes() + call.CopySourceEncryptionKeySha256Bytes = srcParams.GetEncryptionKeySha256Bytes() + } + var res *storagepb.RewriteResponse + var err error + + retryCall := func() error { res, err = c.raw.RewriteObject(ctx, call, s.gax...); return err } + + if err := run(ctx, retryCall, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)); err != nil { + return nil, err + } + + r := &rewriteObjectResponse{ + done: res.GetDone(), + written: res.GetTotalBytesRewritten(), + token: res.GetRewriteToken(), + resource: newObjectFromProto(res.GetResource()), + } + + return r, nil } func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) { diff --git a/storage/http_client.go b/storage/http_client.go index a1c26cda81a1..ffa2e3699436 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -716,7 +716,55 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec return newObject(obj), nil } func (c *httpStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) { - return nil, errMethodNotSupported + s := callSettings(c.settings, opts...) + rawObject := req.dstObject.attrs.toRawObject("") + call := c.raw.Objects.Rewrite(req.srcObject.bucket, req.srcObject.name, req.dstObject.bucket, req.dstObject.name, rawObject) + + call.Context(ctx).Projection("full") + if req.token != "" { + call.RewriteToken(req.token) + } + if req.dstObject.keyName != "" { + call.DestinationKmsKeyName(req.dstObject.keyName) + } + if req.predefinedACL != "" { + call.DestinationPredefinedAcl(req.predefinedACL) + } + if err := applyConds("Copy destination", defaultGen, req.dstObject.conds, call); err != nil { + return nil, err + } + if err := applySourceConds(req.srcObject.gen, req.srcObject.conds, call); err != nil { + return nil, err + } + if s.userProject != "" { + call.UserProject(s.userProject) + } + // Set destination encryption headers. + if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil { + return nil, err + } + // Set source encryption headers. + if err := setEncryptionHeaders(call.Header(), req.srcObject.encryptionKey, true); err != nil { + return nil, err + } + var res *raw.RewriteResponse + var err error + setClientHeader(call.Header()) + + retryCall := func() error { res, err = call.Do(); return err } + + if err := run(ctx, retryCall, s.retry, s.idempotent, setRetryHeaderHTTP(call)); err != nil { + return nil, err + } + + r := &rewriteObjectResponse{ + done: res.Done, + written: res.TotalBytesRewritten, + token: res.RewriteToken, + resource: newObject(res.Resource), + } + + return r, nil } func (c *httpStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) { diff --git a/storage/integration_test.go b/storage/integration_test.go index 20af97cb8e12..30b7765b9357 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -2362,7 +2362,7 @@ func TestIntegration_ACL(t *testing.T) { for _, obj := range aclObjects { c := randomContents() if err := writeObject(ctx, bkt.Object(obj), "", c); err != nil { - t.Errorf("Write for %v failed with %v", obj, err) + return fmt.Errorf("Write for %v failed with %v", obj, err) } } acl, err = o.ACL().List(ctx) @@ -2377,7 +2377,7 @@ func TestIntegration_ACL(t *testing.T) { return nil }) if err != nil { - t.Error(err) + t.Fatal(err) } if err := o.ACL().Delete(ctx, entity); err != nil { t.Errorf("object ACL: could not delete entity %s", entity) @@ -4791,7 +4791,7 @@ func (h testHelper) mustNewReader(obj *ObjectHandle) *Reader { } func writeObject(ctx context.Context, obj *ObjectHandle, contentType string, contents []byte) error { - w := obj.NewWriter(ctx) + w := obj.Retryer(WithPolicy(RetryAlways)).NewWriter(ctx) w.ContentType = contentType w.CacheControl = "public, max-age=60" if contents != nil { diff --git a/storage/internal/version.go b/storage/internal/version.go index 639553700322..291a237fe1cd 100644 --- a/storage/internal/version.go +++ b/storage/internal/version.go @@ -15,4 +15,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.23.0" +const Version = "1.24.0" diff --git a/storage/storage.go b/storage/storage.go index 79ab04f60cd9..d6634e3dbca1 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1208,8 +1208,11 @@ func (o *ObjectAttrs) toProtoObject(b string) *storagepb.Object { } // For now, there are only globally unique buckets, and "_" is the alias - // project ID for such buckets. - b = bucketResourceName("_", b) + // project ID for such buckets. If the bucket is not provided, like in the + // destination ObjectAttrs of a Copy, do not attempt to format it. + if b != "" { + b = bucketResourceName(globalProjectAlias, b) + } return &storagepb.Object{ Bucket: b, @@ -1838,6 +1841,33 @@ func applySourceConds(gen int64, conds *Conditions, call *raw.ObjectsRewriteCall return nil } +func applySourceCondsProto(gen int64, conds *Conditions, call *storagepb.RewriteObjectRequest) error { + if gen >= 0 { + call.SourceGeneration = gen + } + if conds == nil { + return nil + } + if err := conds.validate("CopyTo source"); err != nil { + return err + } + switch { + case conds.GenerationMatch != 0: + call.IfSourceGenerationMatch = proto.Int64(conds.GenerationMatch) + case conds.GenerationNotMatch != 0: + call.IfSourceGenerationNotMatch = proto.Int64(conds.GenerationNotMatch) + case conds.DoesNotExist: + call.IfSourceGenerationMatch = proto.Int64(0) + } + switch { + case conds.MetagenerationMatch != 0: + call.IfSourceMetagenerationMatch = proto.Int64(conds.MetagenerationMatch) + case conds.MetagenerationNotMatch != 0: + call.IfSourceMetagenerationNotMatch = proto.Int64(conds.MetagenerationNotMatch) + } + return nil +} + // setConditionField sets a field on a *raw.WhateverCall. // We can't use anonymous interfaces because the return type is // different, since the field setters are builders. diff --git a/vmmigration/CHANGES.md b/vmmigration/CHANGES.md index 4f65c88dc798..84bbb28b8228 100644 --- a/vmmigration/CHANGES.md +++ b/vmmigration/CHANGES.md @@ -1,6 +1,13 @@ # Changes +## [1.1.0](https://github.com/googleapis/google-cloud-go/compare/vmmigration/v1.0.0...vmmigration/v1.1.0) (2022-07-19) + + +### Features + +* **vmmigration:** Rename product feat: API updates ([53246aa](https://github.com/googleapis/google-cloud-go/commit/53246aa18cb9c79471ecc84878b5e3f166086404)) + ## [1.0.0](https://github.com/googleapis/google-cloud-go/compare/vmmigration/v0.3.0...vmmigration/v1.0.0) (2022-06-29) diff --git a/vmmigration/internal/version.go b/vmmigration/internal/version.go index db6d2e3e99d1..6aea1adc121f 100644 --- a/vmmigration/internal/version.go +++ b/vmmigration/internal/version.go @@ -15,4 +15,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.0.0" +const Version = "1.1.0" From 8fb7ed924a4ed13dc2520c2011eddcf22ac9ced3 Mon Sep 17 00:00:00 2001 From: Noah Dietz Date: Thu, 21 Jul 2022 15:56:11 -0700 Subject: [PATCH 2/6] Revert "chore: sync main to storage-refactor" (#6393) Accidentally used squash. Reverts googleapis/google-cloud-go#6386 --- .release-please-manifest-submodules.json | 6 +- assuredworkloads/CHANGES.md | 8 - assuredworkloads/internal/version.go | 2 +- .../storage/managedwriter/integration_test.go | 13 +- dataplex/CHANGES.md | 7 - dataplex/internal/version.go | 2 +- firestore/bulkwriter.go | 330 ------------------ firestore/bulkwriter_test.go | 239 ------------- firestore/client.go | 12 - firestore/go.mod | 2 - firestore/go.sum | 3 - firestore/integration_test.go | 53 --- firestore/mock_test.go | 8 - firestore/writebatch.go | 4 - internal/gapicgen/cmd/genbot/Dockerfile | 2 +- profiler/integration_test.go | 6 +- profiler/kokoro/integration_test.sh | 11 +- storage/.release-please-manifest.json | 2 +- storage/CHANGES.md | 7 - storage/client.go | 35 +- storage/client_test.go | 49 +-- storage/grpc_client.go | 51 +-- storage/http_client.go | 50 +-- storage/integration_test.go | 6 +- storage/internal/version.go | 2 +- storage/storage.go | 34 +- vmmigration/CHANGES.md | 7 - vmmigration/internal/version.go | 2 +- 28 files changed, 56 insertions(+), 897 deletions(-) delete mode 100644 firestore/bulkwriter.go delete mode 100644 firestore/bulkwriter_test.go diff --git a/.release-please-manifest-submodules.json b/.release-please-manifest-submodules.json index 2fcae63c2811..d1a56c25948b 100644 --- a/.release-please-manifest-submodules.json +++ b/.release-please-manifest-submodules.json @@ -9,7 +9,7 @@ "area120": "0.4.0", "artifactregistry": "1.3.0", "asset": "1.3.0", - "assuredworkloads": "1.2.0", + "assuredworkloads": "1.1.0", "automl": "1.4.0", "baremetalsolution": "0.2.0", "batch": "0.1.0", @@ -30,7 +30,7 @@ "dataform": "0.1.0", "datafusion": "1.3.0", "datalabeling": "0.3.0", - "dataplex": "1.1.0", + "dataplex": "1.0.0", "dataproc": "1.5.0", "dataqna": "0.4.0", "datastream": "1.0.0", @@ -101,7 +101,7 @@ "video": "1.7.0", "videointelligence": "1.4.0", "vision/v2": "2.0.0", - "vmmigration": "1.1.0", + "vmmigration": "1.0.0", "vpcaccess": "1.2.0", "webrisk": "1.3.0", "websecurityscanner": "1.2.0", diff --git a/assuredworkloads/CHANGES.md b/assuredworkloads/CHANGES.md index 6532a92194fb..2404350609b6 100644 --- a/assuredworkloads/CHANGES.md +++ b/assuredworkloads/CHANGES.md @@ -1,14 +1,6 @@ # Changes -## [1.2.0](https://github.com/googleapis/google-cloud-go/compare/assuredworkloads/v1.1.0...assuredworkloads/v1.2.0) (2022-07-19) - - -### Features - -* **assuredworkloads:** Updated the method signature of analyzeWorkloadMove for v1beta API ([53246aa](https://github.com/googleapis/google-cloud-go/commit/53246aa18cb9c79471ecc84878b5e3f166086404)) -* **assuredworkloads:** Updated the method signature of analyzeWorkloadMove for v1beta API to accept project as source. AnalyzeWorkloadMove now also returns information about org policy differences between the project and target folder ([53246aa](https://github.com/googleapis/google-cloud-go/commit/53246aa18cb9c79471ecc84878b5e3f166086404)) - ## [1.1.0](https://github.com/googleapis/google-cloud-go/compare/assuredworkloads/v1.0.0...assuredworkloads/v1.1.0) (2022-07-12) diff --git a/assuredworkloads/internal/version.go b/assuredworkloads/internal/version.go index d0e4a6fd16b3..6aea1adc121f 100644 --- a/assuredworkloads/internal/version.go +++ b/assuredworkloads/internal/version.go @@ -15,4 +15,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.2.0" +const Version = "1.1.0" diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index e252c5bdb8fb..4043d32bddad 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "strings" "sync" "testing" "time" @@ -519,10 +520,18 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie if !ok { t.Errorf("GetResult error was not an instance of ApiError") } - status := apiErr.GRPCStatus() - if status.Code() != codes.InvalidArgument { + if status := apiErr.GRPCStatus(); status.Code() != codes.InvalidArgument { t.Errorf("expected InvalidArgument status, got %v", status) } + + details := apiErr.Details() + if details.DebugInfo == nil { + t.Errorf("expected DebugInfo to be populated, was nil") + } + wantSubstring := "Message size exceed the limitation of byte based flow control." + if detail := details.DebugInfo.GetDetail(); !strings.Contains(detail, wantSubstring) { + t.Errorf("detail missing desired substring: %s", detail) + } } // send a subsequent append as verification we can proceed. result, err = ms.AppendRows(ctx, [][]byte{b}) diff --git a/dataplex/CHANGES.md b/dataplex/CHANGES.md index 338eebd24e46..ad2bd7a4db32 100644 --- a/dataplex/CHANGES.md +++ b/dataplex/CHANGES.md @@ -1,13 +1,6 @@ # Changes -## [1.1.0](https://github.com/googleapis/google-cloud-go/compare/dataplex/v1.0.0...dataplex/v1.1.0) (2022-07-19) - - -### Features - -* **dataplex:** Add IAM support for Explore content APIs feat: Add support for custom container for Task feat: Add support for cross project for Task feat: Add support for custom encryption key to be used for encrypt data on the PDs associated with the VMs in your Dataproc cluster for Task feat: Add support for Latest job in Task resource feat: User mode filter in Explore list sessions API feat: Support logging sampled file paths per partition to Cloud logging for Discovery event ([8b17366](https://github.com/googleapis/google-cloud-go/commit/8b17366c46bbd8a0b2adf39ec3b058eb83192933)) - ## [1.0.0](https://github.com/googleapis/google-cloud-go/compare/dataplex/v0.4.0...dataplex/v1.0.0) (2022-06-29) diff --git a/dataplex/internal/version.go b/dataplex/internal/version.go index 6aea1adc121f..db6d2e3e99d1 100644 --- a/dataplex/internal/version.go +++ b/dataplex/internal/version.go @@ -15,4 +15,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.1.0" +const Version = "1.0.0" diff --git a/firestore/bulkwriter.go b/firestore/bulkwriter.go deleted file mode 100644 index be35be8614c6..000000000000 --- a/firestore/bulkwriter.go +++ /dev/null @@ -1,330 +0,0 @@ -package firestore - -import ( - "context" - "errors" - "fmt" - "sync" - "time" - - vkit "cloud.google.com/go/firestore/apiv1" - "golang.org/x/time/rate" - "google.golang.org/api/support/bundler" - pb "google.golang.org/genproto/googleapis/firestore/v1" -) - -const ( - // maxBatchSize is the max number of writes to send in a request - maxBatchSize = 20 - // maxRetryAttempts is the max number of times to retry a write - maxRetryAttempts = 10 - // defaultStartingMaximumOpsPerSecond is the starting max number of requests to the service per second - defaultStartingMaximumOpsPerSecond = 500 - // maxWritesPerSecond is the starting limit of writes allowed to callers per second - maxWritesPerSecond = maxBatchSize * defaultStartingMaximumOpsPerSecond -) - -// bulkWriterResult contains the WriteResult or error results from an individual -// write to the database. -type bulkWriterResult struct { - result *pb.WriteResult // (cached) result from the operation - err error // (cached) any errors that occurred -} - -// BulkWriterJob provides read-only access to the results of a BulkWriter write attempt. -type BulkWriterJob struct { - resultChan chan bulkWriterResult // send errors and results to this channel - write *pb.Write // the writes to apply to the database - attempts int // number of times this write has been attempted - resultsLock sync.Mutex // guards the cached wr and e values for the job - result *WriteResult // (cached) result from the operation - err error // (cached) any errors that occurred - ctx context.Context // context for canceling/timing out results -} - -// Results gets the results of the BulkWriter write attempt. -// This method blocks if the results for this BulkWriterJob haven't been -// received. -func (j *BulkWriterJob) Results() (*WriteResult, error) { - j.resultsLock.Lock() - defer j.resultsLock.Unlock() - if j.result == nil && j.err == nil { - j.result, j.err = j.processResults() // cache the results for additional calls - } - return j.result, j.err -} - -// processResults checks for errors returned from send() and packages up the -// results as WriteResult objects -func (j *BulkWriterJob) processResults() (*WriteResult, error) { - select { - case <-j.ctx.Done(): - return nil, j.ctx.Err() - case bwr := <-j.resultChan: - if bwr.err != nil { - return nil, bwr.err - } - return writeResultFromProto(bwr.result) - } -} - -// setError ensures that an error is returned on the error channel of BulkWriterJob. -func (j *BulkWriterJob) setError(e error) { - bwr := bulkWriterResult{ - err: e, - result: nil, - } - j.resultChan <- bwr - close(j.resultChan) -} - -// A BulkWriter supports concurrent writes to multiple documents. The BulkWriter -// submits document writes in maximum batches of 20 writes per request. Each -// request can contain many different document writes: create, delete, update, -// and set are all supported. -// -// Only one operation (create, set, update, delete) per document is allowed. -// BulkWriter cannot promise atomicity: individual writes can fail or succeed -// independent of each other. Bulkwriter does not apply writes in any set order; -// thus a document can't have set on it immediately after creation. -type BulkWriter struct { - database string // the database as resource name: projects/[PROJECT]/databases/[DATABASE] - start time.Time // when this BulkWriter was started; used to calculate qps and rate increases - vc *vkit.Client // internal client - maxOpsPerSecond int // number of requests that can be sent per second - docUpdatePaths map[string]bool // document paths with corresponding writes in the queue - limiter rate.Limiter // limit requests to server to <= 500 qps - bundler *bundler.Bundler // handle bundling up writes to Firestore - ctx context.Context // context for canceling all BulkWriter operations - isOpenLock sync.RWMutex // guards against setting isOpen concurrently - isOpen bool // flag that the BulkWriter is closed -} - -// newBulkWriter creates a new instance of the BulkWriter. -func newBulkWriter(ctx context.Context, c *Client, database string) *BulkWriter { - // Although typically we shouldn't store Context objects, in this case we - // need to pass this Context through to the Bundler handler. - ctx = withResourceHeader(ctx, c.path()) - - bw := &BulkWriter{ - database: database, - start: time.Now(), - vc: c.c, - isOpen: true, - maxOpsPerSecond: defaultStartingMaximumOpsPerSecond, - docUpdatePaths: make(map[string]bool), - ctx: ctx, - limiter: *rate.NewLimiter(rate.Limit(maxWritesPerSecond), 1), - } - - // can't initialize within struct above; need instance reference to BulkWriter.send() - bw.bundler = bundler.NewBundler(&BulkWriterJob{}, bw.send) - bw.bundler.HandlerLimit = bw.maxOpsPerSecond - bw.bundler.BundleCountThreshold = maxBatchSize - - return bw -} - -// End sends all enqueued writes in parallel and closes the BulkWriter to new requests. -// After calling End(), calling any additional method automatically returns -// with an error. This method completes when there are no more pending writes -// in the queue. -func (bw *BulkWriter) End() { - bw.isOpenLock.Lock() - bw.isOpen = false - bw.isOpenLock.Unlock() - bw.Flush() -} - -// Flush commits all writes that have been enqueued up to this point in parallel. -// This method blocks execution. -func (bw *BulkWriter) Flush() { - bw.bundler.Flush() -} - -// Create adds a document creation write to the queue of writes to send. -// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once. -func (bw *BulkWriter) Create(doc *DocumentRef, datum interface{}) (*BulkWriterJob, error) { - bw.isOpenLock.RLock() - defer bw.isOpenLock.RUnlock() - err := bw.checkWriteConditions(doc) - if err != nil { - return nil, err - } - - w, err := doc.newCreateWrites(datum) - if err != nil { - return nil, fmt.Errorf("firestore: cannot create %v with %v", doc.ID, datum) - } - - if len(w) > 1 { - return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter") - } - - j := bw.write(w[0]) - return j, nil -} - -// Delete adds a document deletion write to the queue of writes to send. -// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once. -func (bw *BulkWriter) Delete(doc *DocumentRef, preconds ...Precondition) (*BulkWriterJob, error) { - bw.isOpenLock.RLock() - defer bw.isOpenLock.RUnlock() - err := bw.checkWriteConditions(doc) - if err != nil { - return nil, err - } - - w, err := doc.newDeleteWrites(preconds) - if err != nil { - return nil, fmt.Errorf("firestore: cannot delete doc %v", doc.ID) - } - - if len(w) > 1 { - return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter") - } - - j := bw.write(w[0]) - return j, nil -} - -// Set adds a document set write to the queue of writes to send. -// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once. -func (bw *BulkWriter) Set(doc *DocumentRef, datum interface{}, opts ...SetOption) (*BulkWriterJob, error) { - bw.isOpenLock.RLock() - defer bw.isOpenLock.RUnlock() - err := bw.checkWriteConditions(doc) - if err != nil { - return nil, err - } - - w, err := doc.newSetWrites(datum, opts) - if err != nil { - return nil, fmt.Errorf("firestore: cannot set %v on doc %v", datum, doc.ID) - } - - if len(w) > 1 { - return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter") - } - - j := bw.write(w[0]) - return j, nil -} - -// Update adds a document update write to the queue of writes to send. -// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once. -func (bw *BulkWriter) Update(doc *DocumentRef, updates []Update, preconds ...Precondition) (*BulkWriterJob, error) { - bw.isOpenLock.RLock() - defer bw.isOpenLock.RUnlock() - err := bw.checkWriteConditions(doc) - if err != nil { - return nil, err - } - - w, err := doc.newUpdatePathWrites(updates, preconds) - if err != nil { - return nil, fmt.Errorf("firestore: cannot update doc %v", doc.ID) - } - - if len(w) > 1 { - return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter") - } - - j := bw.write(w[0]) - return j, nil -} - -// checkConditions determines whether this write attempt is valid. It returns -// an error if either the BulkWriter has already been closed or if it -// receives a nil document reference. -func (bw *BulkWriter) checkWriteConditions(doc *DocumentRef) error { - if !bw.isOpen { - return errors.New("firestore: BulkWriter has been closed") - } - - if doc == nil { - return errors.New("firestore: nil document contents") - } - - _, havePath := bw.docUpdatePaths[doc.shortPath] - if havePath { - return fmt.Errorf("firestore: BulkWriter received duplicate write for path: %v", doc.shortPath) - } - - bw.docUpdatePaths[doc.shortPath] = true - - return nil -} - -// write packages up write requests into bulkWriterJob objects. -func (bw *BulkWriter) write(w *pb.Write) *BulkWriterJob { - - j := &BulkWriterJob{ - resultChan: make(chan bulkWriterResult, 1), - write: w, - ctx: bw.ctx, - } - - bw.limiter.Wait(bw.ctx) - // ignore operation size constraints and related errors; can't be inferred at compile time - // Bundler is set to accept an unlimited amount of bytes - _ = bw.bundler.Add(j, 0) - - return j -} - -// send transmits writes to the service and matches response results to job channels. -func (bw *BulkWriter) send(i interface{}) { - bwj := i.([]*BulkWriterJob) - - if len(bwj) == 0 { - return - } - - var ws []*pb.Write - for _, w := range bwj { - ws = append(ws, w.write) - } - - bwr := &pb.BatchWriteRequest{ - Database: bw.database, - Writes: ws, - Labels: map[string]string{}, - } - - select { - case <-bw.ctx.Done(): - return - default: - resp, err := bw.vc.BatchWrite(bw.ctx, bwr) - if err != nil { - // Do we need to be selective about what kind of errors we send? - for _, j := range bwj { - j.setError(err) - } - return - } - // Match write results with BulkWriterJob objects - for i, res := range resp.WriteResults { - s := resp.Status[i] - c := s.GetCode() - if c != 0 { // Should we do an explicit check against rpc.Code enum? - j := bwj[i] - j.attempts++ - - // Do we need separate retry bundler? - if j.attempts < maxRetryAttempts { - // ignore operation size constraints and related errors; job size can't be inferred at compile time - // Bundler is set to accept an unlimited amount of bytes - _ = bw.bundler.Add(j, 0) - } else { - j.setError(fmt.Errorf("firestore: write failed with status: %v", s)) - } - continue - } - - bwj[i].resultChan <- bulkWriterResult{err: nil, result: res} - close(bwj[i].resultChan) - } - } -} diff --git a/firestore/bulkwriter_test.go b/firestore/bulkwriter_test.go deleted file mode 100644 index 3349e3b7089e..000000000000 --- a/firestore/bulkwriter_test.go +++ /dev/null @@ -1,239 +0,0 @@ -package firestore - -import ( - "context" - "testing" - - pb "google.golang.org/genproto/googleapis/firestore/v1" - "google.golang.org/genproto/googleapis/rpc/status" - "google.golang.org/grpc/codes" -) - -type bulkwriterTestCase struct { - name string - test func(*BulkWriter) (*BulkWriterJob, error) -} - -// setupMockServer adds expected write requests and correct mocked responses -// to the mockServer. -func setupMockServer(c *Client, docPrefix string, srv *mockServer) { - // Create - srv.addRPC( - &pb.BatchWriteRequest{ - Database: c.path(), - Writes: []*pb.Write{ - { - Operation: &pb.Write_Update{ - Update: &pb.Document{ - Name: docPrefix + "a", - Fields: testFields, - }, - }, - CurrentDocument: &pb.Precondition{ - ConditionType: &pb.Precondition_Exists{ - Exists: false, - }, - }, - }, - }, - }, - &pb.BatchWriteResponse{ - WriteResults: []*pb.WriteResult{ - {UpdateTime: aTimestamp}, - }, - Status: []*status.Status{ - { - Code: int32(codes.OK), - Message: "create test successful", - }, - }, - }, - ) - - // Set - srv.addRPC( - &pb.BatchWriteRequest{ - Database: c.path(), - Writes: []*pb.Write{ - { - Operation: &pb.Write_Update{ - Update: &pb.Document{ - Name: docPrefix + "b", - Fields: testFields, - }, - }, - }, - }, - }, - &pb.BatchWriteResponse{ - WriteResults: []*pb.WriteResult{ - {UpdateTime: aTimestamp2}, - }, - Status: []*status.Status{ - { - Code: int32(codes.OK), - Message: "set test successful", - }, - }, - }, - ) - - // Delete - srv.addRPC( - &pb.BatchWriteRequest{ - Database: c.path(), - Writes: []*pb.Write{ - { - Operation: &pb.Write_Delete{ - Delete: docPrefix + "c", - }, - }, - }, - }, - &pb.BatchWriteResponse{ - WriteResults: []*pb.WriteResult{ - {UpdateTime: aTimestamp3}, - }, - Status: []*status.Status{ - { - Code: int32(codes.OK), - Message: "delete test successful", - }, - }, - }, - ) - - // Update - srv.addRPC( - &pb.BatchWriteRequest{ - Database: c.path(), - Writes: []*pb.Write{ - { - Operation: &pb.Write_Update{ - Update: &pb.Document{ - Name: docPrefix + "f", - Fields: map[string]*pb.Value{"*": intval(3)}, - }, - }, - UpdateMask: &pb.DocumentMask{FieldPaths: []string{"`*`"}}, - CurrentDocument: &pb.Precondition{ - ConditionType: &pb.Precondition_Exists{ - Exists: true, - }, - }, - }, - }, - }, - &pb.BatchWriteResponse{ - WriteResults: []*pb.WriteResult{ - {UpdateTime: aTimestamp3}, - }, - Status: []*status.Status{ - { - Code: int32(codes.OK), - Message: "update test successful", - }, - }, - }, - ) -} - -func TestBulkWriter(t *testing.T) { - c, srv, cleanup := newMock(t) - defer cleanup() - - docPrefix := c.Collection("C").Path + "/" - - setupMockServer(c, docPrefix, srv) - - ctx := context.Background() - bw := c.BulkWriter(ctx) - wantWRs := []*WriteResult{{aTime}, {aTime2}, {aTime3}, {aTime3}} - tcs := []bulkwriterTestCase{ - { - name: "Create()", - test: func(b *BulkWriter) (*BulkWriterJob, error) { - return b.Create(c.Doc("C/a"), testData) - }, - }, - { - name: "Set()", - test: func(b *BulkWriter) (*BulkWriterJob, error) { return b.Set(c.Doc("C/b"), testData) }, - }, - { - name: "Delete()", - test: func(b *BulkWriter) (*BulkWriterJob, error) { - return b.Delete(c.Doc("C/c")) - }, - }, - { - name: "Update()", - test: func(b *BulkWriter) (*BulkWriterJob, error) { - return b.Update(c.Doc("C/f"), []Update{{FieldPath: []string{"*"}, Value: 3}}) - }, - }, - } - - for i, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - j, err := tc.test(bw) - if err != nil { - t.Errorf("bulkwriter: cannot call %s for document\n", tc.name) - } - if j == nil { - t.Fatalf("bulkwriter: got nil WriteResult for call to %s\n", tc.name) - } - - bw.Flush() - - wr, err := j.Results() - if err != nil { - t.Errorf("bulkwriter:\nwanted %v,\n, got error: %v", wantWRs[i], err) - } - - if !testEqual(wr, wantWRs[i]) { - t.Errorf("bulkwriter:\nwanted %v,\n got %v\n", wantWRs[i], wr) - } - }) - } -} - -func TestBulkWriterErrors(t *testing.T) { - c, _, cleanup := newMock(t) - defer cleanup() - ctx, cancel := context.WithCancel(context.Background()) - b := c.BulkWriter(ctx) - - tcs := []bulkwriterTestCase{ - { - name: "empty document reference", - test: func(b *BulkWriter) (*BulkWriterJob, error) { - return b.Delete(nil) - }, - }, - { - name: "cannot write to same document twice", - test: func(b *BulkWriter) (*BulkWriterJob, error) { - b.Create(c.Doc("C/a"), testData) - return b.Delete(c.Doc("C/a")) - }, - }, - { - name: "cannot ask a closed BulkWriter to write", - test: func(b *BulkWriter) (*BulkWriterJob, error) { - cancel() - b.End() - return b.Delete(c.Doc("C/b")) - }, - }, - } - - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - _, err := tc.test(b) - if err == nil { - t.Fatalf("wanted error, got nil value") - } - }) - } -} diff --git a/firestore/client.go b/firestore/client.go index 3da59ede72d7..35d695a3e749 100644 --- a/firestore/client.go +++ b/firestore/client.go @@ -290,22 +290,10 @@ func (c *Client) Collections(ctx context.Context) *CollectionIterator { } // Batch returns a WriteBatch. -// -// Deprecated: The WriteBatch API has been replaced with the transaction and -// the bulk writer API. For atomic transaction operations, use `Transaction`. -// For bulk read and write operations, use `BulkWriter`. func (c *Client) Batch() *WriteBatch { return &WriteBatch{c: c} } -// BulkWriter returns a BulkWriter instance. -// The context passed to the BulkWriter remains stored through the lifecycle -// of the object. This context allows callers to cancel BulkWriter operations. -func (c *Client) BulkWriter(ctx context.Context) *BulkWriter { - bw := newBulkWriter(ctx, c, c.path()) - return bw -} - // commit calls the Commit RPC outside of a transaction. func (c *Client) commit(ctx context.Context, ws []*pb.Write) (_ []*WriteResult, err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.commit") diff --git a/firestore/go.mod b/firestore/go.mod index 6e3b7ee32e47..49698429ce06 100644 --- a/firestore/go.mod +++ b/firestore/go.mod @@ -7,7 +7,6 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.8 github.com/googleapis/gax-go/v2 v2.4.0 - golang.org/x/time v0.0.0-20220609170525-579cf78fd858 google.golang.org/api v0.85.0 google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad google.golang.org/grpc v1.47.0 @@ -21,7 +20,6 @@ require ( go.opencensus.io v0.23.0 // indirect golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 // indirect golang.org/x/oauth2 v0.0.0-20220608161450-d0670ef3b1eb // indirect - golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect diff --git a/firestore/go.sum b/firestore/go.sum index 7192a25bbe07..b883b68a4ce5 100644 --- a/firestore/go.sum +++ b/firestore/go.sum @@ -324,7 +324,6 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -401,8 +400,6 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U= -golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/firestore/integration_test.go b/firestore/integration_test.go index 40598e6df1c2..dfd769845b91 100644 --- a/firestore/integration_test.go +++ b/firestore/integration_test.go @@ -1720,56 +1720,3 @@ func TestIntegration_ColGroupRefPartitionsLarge(t *testing.T) { t.Errorf("Unexpected number of documents across partitions: got %d, want %d", got, want) } } - -func TestIntegration_BulkWriter(t *testing.T) { - doc := iColl.NewDoc() - c := integrationClient(t) - ctx := context.Background() - bw := c.BulkWriter(ctx) - - f := integrationTestMap - j, err := bw.Create(doc, f) - - if err != nil { - t.Errorf("bulkwriter: error creating write to database: %v\n", err) - } - - bw.Flush() // This blocks - res, err := j.Results() // so does this - - if err != nil { - t.Errorf("bulkwriter: error getting write results: %v\n", err) - } - - if res == nil { - t.Error("bulkwriter: write attempt returned nil results") - } - - numNewWrites := 21 // 20 is the threshold at which the bundler should start sending requests - var jobs []*BulkWriterJob - - // Test a slew of writes sent at the BulkWriter - for i := 0; i < numNewWrites; i++ { - d := iColl.NewDoc() - jb, err := bw.Create(d, f) - - if err != nil { - t.Errorf("bulkwriter: error creating write to database: %v\n", err) - } - - jobs = append(jobs, jb) - } - - bw.End() // This calls Flush() in the background. - - for _, j := range jobs { - res, err = j.Results() - if err != nil { - t.Errorf("bulkwriter: error getting write results: %v\n", err) - } - - if res == nil { - t.Error("bulkwriter: write attempt returned nil results") - } - } -} diff --git a/firestore/mock_test.go b/firestore/mock_test.go index ef40a92953b3..8b333744e8dd 100644 --- a/firestore/mock_test.go +++ b/firestore/mock_test.go @@ -227,11 +227,3 @@ func (s *mockServer) Listen(stream pb.Firestore_ListenServer) error { } return nil } - -func (s *mockServer) BatchWrite(_ context.Context, req *pb.BatchWriteRequest) (*pb.BatchWriteResponse, error) { - res, err := s.popRPC(req) - if err != nil { - return nil, err - } - return res.(*pb.BatchWriteResponse), nil -} diff --git a/firestore/writebatch.go b/firestore/writebatch.go index e1de0760ac37..aaa035738d13 100644 --- a/firestore/writebatch.go +++ b/firestore/writebatch.go @@ -26,10 +26,6 @@ import ( // Update and Delete methods, then run it with the Commit method. Errors in Create, // Set, Update or Delete are recorded instead of being returned immediately. The // first such error is returned by Commit. -// -// Deprecated: The WriteBatch API has been replaced with the transaction and -// the bulk writer API. For atomic transaction operations, use `Transaction`. -// For bulk read and write operations, use `BulkWriter`. type WriteBatch struct { c *Client err error diff --git a/internal/gapicgen/cmd/genbot/Dockerfile b/internal/gapicgen/cmd/genbot/Dockerfile index cb929d12b84a..9f1261f16328 100644 --- a/internal/gapicgen/cmd/genbot/Dockerfile +++ b/internal/gapicgen/cmd/genbot/Dockerfile @@ -27,7 +27,7 @@ RUN go install github.com/golang/protobuf/protoc-gen-go@v1.5.2 && \ go install golang.org/x/lint/golint@latest && \ go install golang.org/x/tools/cmd/goimports@latest && \ go install honnef.co/go/tools/cmd/staticcheck@latest && \ - go install github.com/googleapis/gapic-generator-go/cmd/protoc-gen-go_gapic@v0.31.2 + go install github.com/googleapis/gapic-generator-go/cmd/protoc-gen-go_gapic@v0.31.0 ENV PATH="${PATH}:/root/go/bin" # Source: http://debuggable.com/posts/disable-strict-host-checking-for-git-clone:49896ff3-0ac0-4263-9703-1eae4834cda3 diff --git a/profiler/integration_test.go b/profiler/integration_test.go index e0a84db1ecef..cca1ed590388 100644 --- a/profiler/integration_test.go +++ b/profiler/integration_test.go @@ -118,17 +118,13 @@ echo "{{.FinishString}}" {{ define "integration_backoff" -}} {{- template "prologue" . }} {{- template "setup" . }} - -# Compile first so each spawned process can just use the same binary. -go build busybench.go - # Do not display commands being run to simplify logging output. set +x # Run benchmarks with agent. echo "Starting {{.NumBackoffBenchmarks}} benchmarks." for (( i = 0; i < {{.NumBackoffBenchmarks}}; i++ )); do - (./busybench --service="{{.Service}}" --duration={{.DurationSec}} \ + (go run busybench.go --service="{{.Service}}" --duration={{.DurationSec}} \ --num_busyworkers=1) |& while read line; \ do echo "benchmark $i: ${line}"; done & done diff --git a/profiler/kokoro/integration_test.sh b/profiler/kokoro/integration_test.sh index 8d532de09160..8a655dc23316 100755 --- a/profiler/kokoro/integration_test.sh +++ b/profiler/kokoro/integration_test.sh @@ -44,9 +44,14 @@ export GIMME_GO_VERSION=1.18.4 export GIMME_ENV_PREFIX=/tmp/gimme_envs install_go() { "$GIMME" - # If gimme fails, this file will not exists, source will fail, and install_go - # will be retried. - source "${GIMME_ENV_PREFIX}/go${GIMME_GO_VERSION}.env" + # If gimme was successful, an .env script that sets up proper environment + # variables will be created. + if [[ -f "${GIMME_ENV_PREFIX}/go${GIMME_GO_VERSION}.env" ]] + then + source "${GIMME_ENV_PREFIX}/go${GIMME_GO_VERSION}.env" + else + return 1 + fi } retry install_go diff --git a/storage/.release-please-manifest.json b/storage/.release-please-manifest.json index 7e1a2feb7072..110758e0c145 100644 --- a/storage/.release-please-manifest.json +++ b/storage/.release-please-manifest.json @@ -1,3 +1,3 @@ { - "storage": "1.24.0" + "storage": "1.23.0" } \ No newline at end of file diff --git a/storage/CHANGES.md b/storage/CHANGES.md index 75b3f62deb89..4a80b8ff4cfe 100644 --- a/storage/CHANGES.md +++ b/storage/CHANGES.md @@ -1,13 +1,6 @@ # Changes -## [1.24.0](https://github.com/googleapis/google-cloud-go/compare/storage/v1.23.0...storage/v1.24.0) (2022-07-20) - - -### Features - -* **storage:** add Custom Placement Config Dual Region Support ([#6294](https://github.com/googleapis/google-cloud-go/issues/6294)) ([5a8c607](https://github.com/googleapis/google-cloud-go/commit/5a8c607e3a9a3265887e27cb13f8943f3e3fa23d)) - ## [1.23.0](https://github.com/googleapis/google-cloud-go/compare/storage/v1.22.1...storage/v1.23.0) (2022-06-23) diff --git a/storage/client.go b/storage/client.go index ef443b755853..7117d5820d79 100644 --- a/storage/client.go +++ b/storage/client.go @@ -278,33 +278,34 @@ type newRangeReaderParams struct { type composeObjectRequest struct { dstBucket string - dstObject destinationObject - srcs []sourceObject + dstObject composeDstObject + srcs []composeSrcObject predefinedACL string encryptionKey []byte sendCRC32C bool } -type sourceObject struct { - name string - bucket string - gen int64 - conds *Conditions - encryptionKey []byte +type composeSrcObject struct { + name string + gen int64 + conds *Conditions } -type destinationObject struct { - name string - bucket string - conds *Conditions - attrs *ObjectAttrs // attrs to set on the destination object. - encryptionKey []byte - keyName string +type composeDstObject struct { + name string + conds *Conditions + attrs *ObjectAttrs // attrs to set on the destination object. } type rewriteObjectRequest struct { - srcObject sourceObject - dstObject destinationObject + srcBucket string + srcObject string + dstBucket string + dstObject string + dstKeyName string + attrs *ObjectAttrs + gen int64 + conds *Conditions predefinedACL string token string } diff --git a/storage/client_test.go b/storage/client_test.go index a32d5b9ef007..cb667e30f037 100644 --- a/storage/client_test.go +++ b/storage/client_test.go @@ -276,51 +276,6 @@ func TestGetObjectEmulated(t *testing.T) { }) } -func TestRewriteObjectEmulated(t *testing.T) { - transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { - // Populate test object. - _, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ - Name: bucket, - }) - if err != nil { - t.Fatalf("client.CreateBucket: %v", err) - } - src := ObjectAttrs{ - Bucket: bucket, - Name: fmt.Sprintf("testObject-%d", time.Now().Nanosecond()), - } - w := veneerClient.Bucket(bucket).Object(src.Name).NewWriter(context.Background()) - if _, err := w.Write(randomBytesToWrite); err != nil { - t.Fatalf("failed to populate test object: %v", err) - } - if err := w.Close(); err != nil { - t.Fatalf("closing object: %v", err) - } - req := &rewriteObjectRequest{ - dstObject: destinationObject{ - bucket: bucket, - name: fmt.Sprintf("copy-of-%s", src.Name), - attrs: &ObjectAttrs{}, - }, - srcObject: sourceObject{ - bucket: bucket, - name: src.Name, - gen: defaultGen, - }, - } - got, err := client.RewriteObject(context.Background(), req) - if err != nil { - t.Fatal(err) - } - if !got.done { - t.Fatal("didn't finish writing!") - } - if want := int64(len(randomBytesToWrite)); got.written != want { - t.Errorf("Bytes written: got %d, want %d", got.written, want) - } - }) -} - func TestUpdateObjectEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test object. @@ -1031,11 +986,11 @@ func TestComposeEmulated(t *testing.T) { dstName := fmt.Sprintf("%d-object3", prefix) req := composeObjectRequest{ dstBucket: bucket, - dstObject: destinationObject{ + dstObject: composeDstObject{ name: dstName, attrs: &ObjectAttrs{StorageClass: "COLDLINE"}, }, - srcs: []sourceObject{ + srcs: []composeSrcObject{ {name: srcNames[0]}, {name: srcNames[1]}, }, diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 110baebb86fb..4bf8f6baefb2 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -766,56 +766,7 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec return newObjectFromProto(obj), nil } func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) { - s := callSettings(c.settings, opts...) - obj := req.dstObject.attrs.toProtoObject("") - call := &storagepb.RewriteObjectRequest{ - SourceBucket: bucketResourceName(globalProjectAlias, req.srcObject.bucket), - SourceObject: req.srcObject.name, - RewriteToken: req.token, - DestinationBucket: bucketResourceName(globalProjectAlias, req.dstObject.bucket), - DestinationName: req.dstObject.name, - Destination: obj, - DestinationKmsKey: req.dstObject.keyName, - DestinationPredefinedAcl: req.predefinedACL, - } - - // The userProject, whether source or destination project, is decided by the code calling the interface. - if s.userProject != "" { - ctx = setUserProjectMetadata(ctx, s.userProject) - } - if err := applyCondsProto("Copy destination", defaultGen, req.dstObject.conds, call); err != nil { - return nil, err - } - if err := applySourceCondsProto(req.srcObject.gen, req.srcObject.conds, call); err != nil { - return nil, err - } - - if len(req.dstObject.encryptionKey) > 0 { - call.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey) - } - if len(req.srcObject.encryptionKey) > 0 { - srcParams := toProtoCommonObjectRequestParams(req.srcObject.encryptionKey) - call.CopySourceEncryptionAlgorithm = srcParams.GetEncryptionAlgorithm() - call.CopySourceEncryptionKeyBytes = srcParams.GetEncryptionKeyBytes() - call.CopySourceEncryptionKeySha256Bytes = srcParams.GetEncryptionKeySha256Bytes() - } - var res *storagepb.RewriteResponse - var err error - - retryCall := func() error { res, err = c.raw.RewriteObject(ctx, call, s.gax...); return err } - - if err := run(ctx, retryCall, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)); err != nil { - return nil, err - } - - r := &rewriteObjectResponse{ - done: res.GetDone(), - written: res.GetTotalBytesRewritten(), - token: res.GetRewriteToken(), - resource: newObjectFromProto(res.GetResource()), - } - - return r, nil + return nil, errMethodNotSupported } func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) { diff --git a/storage/http_client.go b/storage/http_client.go index ffa2e3699436..a1c26cda81a1 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -716,55 +716,7 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec return newObject(obj), nil } func (c *httpStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) { - s := callSettings(c.settings, opts...) - rawObject := req.dstObject.attrs.toRawObject("") - call := c.raw.Objects.Rewrite(req.srcObject.bucket, req.srcObject.name, req.dstObject.bucket, req.dstObject.name, rawObject) - - call.Context(ctx).Projection("full") - if req.token != "" { - call.RewriteToken(req.token) - } - if req.dstObject.keyName != "" { - call.DestinationKmsKeyName(req.dstObject.keyName) - } - if req.predefinedACL != "" { - call.DestinationPredefinedAcl(req.predefinedACL) - } - if err := applyConds("Copy destination", defaultGen, req.dstObject.conds, call); err != nil { - return nil, err - } - if err := applySourceConds(req.srcObject.gen, req.srcObject.conds, call); err != nil { - return nil, err - } - if s.userProject != "" { - call.UserProject(s.userProject) - } - // Set destination encryption headers. - if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil { - return nil, err - } - // Set source encryption headers. - if err := setEncryptionHeaders(call.Header(), req.srcObject.encryptionKey, true); err != nil { - return nil, err - } - var res *raw.RewriteResponse - var err error - setClientHeader(call.Header()) - - retryCall := func() error { res, err = call.Do(); return err } - - if err := run(ctx, retryCall, s.retry, s.idempotent, setRetryHeaderHTTP(call)); err != nil { - return nil, err - } - - r := &rewriteObjectResponse{ - done: res.Done, - written: res.TotalBytesRewritten, - token: res.RewriteToken, - resource: newObject(res.Resource), - } - - return r, nil + return nil, errMethodNotSupported } func (c *httpStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) { diff --git a/storage/integration_test.go b/storage/integration_test.go index 30b7765b9357..20af97cb8e12 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -2362,7 +2362,7 @@ func TestIntegration_ACL(t *testing.T) { for _, obj := range aclObjects { c := randomContents() if err := writeObject(ctx, bkt.Object(obj), "", c); err != nil { - return fmt.Errorf("Write for %v failed with %v", obj, err) + t.Errorf("Write for %v failed with %v", obj, err) } } acl, err = o.ACL().List(ctx) @@ -2377,7 +2377,7 @@ func TestIntegration_ACL(t *testing.T) { return nil }) if err != nil { - t.Fatal(err) + t.Error(err) } if err := o.ACL().Delete(ctx, entity); err != nil { t.Errorf("object ACL: could not delete entity %s", entity) @@ -4791,7 +4791,7 @@ func (h testHelper) mustNewReader(obj *ObjectHandle) *Reader { } func writeObject(ctx context.Context, obj *ObjectHandle, contentType string, contents []byte) error { - w := obj.Retryer(WithPolicy(RetryAlways)).NewWriter(ctx) + w := obj.NewWriter(ctx) w.ContentType = contentType w.CacheControl = "public, max-age=60" if contents != nil { diff --git a/storage/internal/version.go b/storage/internal/version.go index 291a237fe1cd..639553700322 100644 --- a/storage/internal/version.go +++ b/storage/internal/version.go @@ -15,4 +15,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.24.0" +const Version = "1.23.0" diff --git a/storage/storage.go b/storage/storage.go index d6634e3dbca1..79ab04f60cd9 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1208,11 +1208,8 @@ func (o *ObjectAttrs) toProtoObject(b string) *storagepb.Object { } // For now, there are only globally unique buckets, and "_" is the alias - // project ID for such buckets. If the bucket is not provided, like in the - // destination ObjectAttrs of a Copy, do not attempt to format it. - if b != "" { - b = bucketResourceName(globalProjectAlias, b) - } + // project ID for such buckets. + b = bucketResourceName("_", b) return &storagepb.Object{ Bucket: b, @@ -1841,33 +1838,6 @@ func applySourceConds(gen int64, conds *Conditions, call *raw.ObjectsRewriteCall return nil } -func applySourceCondsProto(gen int64, conds *Conditions, call *storagepb.RewriteObjectRequest) error { - if gen >= 0 { - call.SourceGeneration = gen - } - if conds == nil { - return nil - } - if err := conds.validate("CopyTo source"); err != nil { - return err - } - switch { - case conds.GenerationMatch != 0: - call.IfSourceGenerationMatch = proto.Int64(conds.GenerationMatch) - case conds.GenerationNotMatch != 0: - call.IfSourceGenerationNotMatch = proto.Int64(conds.GenerationNotMatch) - case conds.DoesNotExist: - call.IfSourceGenerationMatch = proto.Int64(0) - } - switch { - case conds.MetagenerationMatch != 0: - call.IfSourceMetagenerationMatch = proto.Int64(conds.MetagenerationMatch) - case conds.MetagenerationNotMatch != 0: - call.IfSourceMetagenerationNotMatch = proto.Int64(conds.MetagenerationNotMatch) - } - return nil -} - // setConditionField sets a field on a *raw.WhateverCall. // We can't use anonymous interfaces because the return type is // different, since the field setters are builders. diff --git a/vmmigration/CHANGES.md b/vmmigration/CHANGES.md index 84bbb28b8228..4f65c88dc798 100644 --- a/vmmigration/CHANGES.md +++ b/vmmigration/CHANGES.md @@ -1,13 +1,6 @@ # Changes -## [1.1.0](https://github.com/googleapis/google-cloud-go/compare/vmmigration/v1.0.0...vmmigration/v1.1.0) (2022-07-19) - - -### Features - -* **vmmigration:** Rename product feat: API updates ([53246aa](https://github.com/googleapis/google-cloud-go/commit/53246aa18cb9c79471ecc84878b5e3f166086404)) - ## [1.0.0](https://github.com/googleapis/google-cloud-go/compare/vmmigration/v0.3.0...vmmigration/v1.0.0) (2022-06-29) diff --git a/vmmigration/internal/version.go b/vmmigration/internal/version.go index 6aea1adc121f..db6d2e3e99d1 100644 --- a/vmmigration/internal/version.go +++ b/vmmigration/internal/version.go @@ -15,4 +15,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.1.0" +const Version = "1.0.0" From 4ab0845804e5e625586e8756fcae6f6915cbd039 Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Mon, 25 Jul 2022 12:54:38 -0400 Subject: [PATCH 3/6] chore(storage): integrate Compose in new interface Change production code paths for Composer to use the new transport agnostic interface. Required a few small changes to make tests pass: * Add initialization of tc to NewClient * Add new useGRPC flag to remove switches on tc in Reader/Writer * A unit test checks that setting Generation with composer causes a failure. Currently this is enforced by applyConds. I added a separate validation for this instead, but I could do a pass- through of the Generation through the interface -- it's just confusing looking because setting Generation on a new object is never valid. --- storage/client.go | 1 - storage/copy.go | 63 ++++++++++++++++++++---------------------- storage/grpc_client.go | 6 ++-- storage/http_client.go | 4 +-- storage/reader.go | 2 +- storage/storage.go | 12 +++++++- storage/writer.go | 4 +-- 7 files changed, 49 insertions(+), 43 deletions(-) diff --git a/storage/client.go b/storage/client.go index ef443b755853..cb2c108d7625 100644 --- a/storage/client.go +++ b/storage/client.go @@ -281,7 +281,6 @@ type composeObjectRequest struct { dstObject destinationObject srcs []sourceObject predefinedACL string - encryptionKey []byte sendCRC32C bool } diff --git a/storage/copy.go b/storage/copy.go index 88e1daefd025..c95fd9742de4 100644 --- a/storage/copy.go +++ b/storage/copy.go @@ -188,6 +188,9 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { if err := c.dst.validate(); err != nil { return nil, err } + if c.dst.gen != defaultGen { + return nil, fmt.Errorf("storage: generation cannot be specified on compose destination, got %v", c.dst.gen) + } if len(c.srcs) == 0 { return nil, errors.New("storage: at least one source object must be specified") } @@ -204,45 +207,39 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { } } - // TODO: transport agnostic interface starts here. - req := &raw.ComposeRequest{} - // Compose requires a non-empty Destination, so we always set it, - // even if the caller-provided ObjectAttrs is the zero value. - req.Destination = c.ObjectAttrs.toRawObject(c.dst.bucket) - if c.SendCRC32C { - req.Destination.Crc32c = encodeUint32(c.ObjectAttrs.CRC32C) - } - for _, src := range c.srcs { - srcObj := &raw.ComposeRequestSourceObjects{ - Name: src.object, - } - if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil { - return nil, err + req := &composeObjectRequest{ + dstBucket: c.dst.bucket, + predefinedACL: c.PredefinedACL, + sendCRC32C: c.SendCRC32C, + } + req.dstObject = destinationObject{ + name: c.dst.object, + bucket: c.dst.bucket, + conds: c.dst.conds, + attrs: &c.ObjectAttrs, + encryptionKey: c.dst.encryptionKey, + } + for _, src := range(c.srcs) { + s := sourceObject{ + name: src.object, + bucket: src.bucket, + gen: src.gen, + conds: src.conds, } - req.SourceObjects = append(req.SourceObjects, srcObj) + req.srcs = append(req.srcs, s) } - call := c.dst.c.raw.Objects.Compose(c.dst.bucket, c.dst.object, req).Context(ctx) - if err := applyConds("ComposeFrom destination", c.dst.gen, c.dst.conds, call); err != nil { - return nil, err + // TODO: factor this out to a function? + isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist) + opts := []storageOption{idempotent(isIdempotent)} + if c.dst.retry != nil { + opts = append(opts, withRetryConfig(c.dst.retry)) } if c.dst.userProject != "" { - call.UserProject(c.dst.userProject) + opts = append(opts, withUserProject(c.dst.userProject)) } - if c.PredefinedACL != "" { - call.DestinationPredefinedAcl(c.PredefinedACL) - } - if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil { - return nil, err - } - var obj *raw.Object - setClientHeader(call.Header()) - retryCall := func() error { obj, err = call.Do(); return err } - isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist) + // TODO: Need to add withClientOptions or withGAXOptions? - if err := run(ctx, retryCall, c.dst.retry, isIdempotent, setRetryHeaderHTTP(call)); err != nil { - return nil, err - } - return newObject(obj), nil + return c.dst.c.tc.ComposeObject(ctx, req, opts...) } diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 110baebb86fb..9d253369aebb 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -727,7 +727,7 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec dstObjPb := req.dstObject.attrs.toProtoObject(req.dstBucket) dstObjPb.Name = req.dstObject.name - if err := applyCondsProto("ComposeObject destination", -1, req.dstObject.conds, dstObjPb); err != nil { + if err := applyCondsProto("ComposeObject destination", defaultGen, req.dstObject.conds, dstObjPb); err != nil { return nil, err } if req.sendCRC32C { @@ -750,8 +750,8 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec if req.predefinedACL != "" { rawReq.DestinationPredefinedAcl = req.predefinedACL } - if req.encryptionKey != nil { - rawReq.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.encryptionKey) + if req.dstObject.encryptionKey != nil { + rawReq.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey) } var obj *storagepb.Object diff --git a/storage/http_client.go b/storage/http_client.go index ffa2e3699436..996adff9fbf8 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -692,7 +692,7 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec } call := c.raw.Objects.Compose(req.dstBucket, req.dstObject.name, rawReq).Context(ctx) - if err := applyConds("ComposeFrom destination", -1, req.dstObject.conds, call); err != nil { + if err := applyConds("ComposeFrom destination", defaultGen, req.dstObject.conds, call); err != nil { return nil, err } if s.userProject != "" { @@ -701,7 +701,7 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec if req.predefinedACL != "" { call.DestinationPredefinedAcl(req.predefinedACL) } - if err := setEncryptionHeaders(call.Header(), req.encryptionKey, false); err != nil { + if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil { return nil, err } var obj *raw.Object diff --git a/storage/reader.go b/storage/reader.go index 7a28b6745e7d..ca3e42a9bc65 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -94,7 +94,7 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.NewRangeReader") defer func() { trace.EndSpan(ctx, err) }() - if o.c.tc != nil { + if o.c.useGRPC { return o.newRangeReaderWithGRPC(ctx, offset, length) } diff --git a/storage/storage.go b/storage/storage.go index d6634e3dbca1..086d2dfc7654 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -115,6 +115,10 @@ type Client struct { // tc is the transport-agnostic client implemented with either gRPC or HTTP. tc storageClient + // useGRPC flags whether the client uses gRPC. This is needed while the + // integration piece is only partially complete. + // TODO: remove before merging to main. + useGRPC bool } // NewClient creates a new Google Cloud Storage client. @@ -195,12 +199,18 @@ func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error return nil, fmt.Errorf("supplied endpoint %q is not valid: %v", ep, err) } + tc, err := newHTTPStorageClient(ctx, withClientOptions(opts...)) + if err != nil { + return nil, fmt.Errorf("storage: %v", err) + } + return &Client{ hc: hc, raw: rawService, scheme: u.Scheme, readHost: u.Host, creds: creds, + tc: tc, }, nil } @@ -215,7 +225,7 @@ func newGRPCClient(ctx context.Context, opts ...option.ClientOption) (*Client, e return nil, err } - return &Client{tc: tc}, nil + return &Client{tc: tc, useGRPC: true}, nil } // Close closes the Client. diff --git a/storage/writer.go b/storage/writer.go index 0a9bc2bcee46..c6be339bf728 100644 --- a/storage/writer.go +++ b/storage/writer.go @@ -236,7 +236,7 @@ func (w *Writer) Write(p []byte) (n int, err error) { } if !w.opened { // gRPC client has been initialized - use gRPC to upload. - if w.o.c.tc != nil { + if w.o.c.useGRPC { if err := w.openWriter(); err != nil { return 0, err } @@ -264,7 +264,7 @@ func (w *Writer) Write(p []byte) (n int, err error) { // can be retrieved by calling Attrs. func (w *Writer) Close() error { if !w.opened { - if w.o.c.tc != nil { + if w.o.c.useGRPC { if err := w.openWriter(); err != nil { return err } From d6cff41d1aef41c326014b2775b78d2e14b3664e Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Mon, 25 Jul 2022 15:18:13 -0400 Subject: [PATCH 4/6] gofmt --- storage/copy.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/storage/copy.go b/storage/copy.go index c95fd9742de4..7e00dc8dfe76 100644 --- a/storage/copy.go +++ b/storage/copy.go @@ -208,23 +208,23 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { } req := &composeObjectRequest{ - dstBucket: c.dst.bucket, + dstBucket: c.dst.bucket, predefinedACL: c.PredefinedACL, - sendCRC32C: c.SendCRC32C, + sendCRC32C: c.SendCRC32C, } req.dstObject = destinationObject{ - name: c.dst.object, - bucket: c.dst.bucket, - conds: c.dst.conds, - attrs: &c.ObjectAttrs, + name: c.dst.object, + bucket: c.dst.bucket, + conds: c.dst.conds, + attrs: &c.ObjectAttrs, encryptionKey: c.dst.encryptionKey, } - for _, src := range(c.srcs) { + for _, src := range c.srcs { s := sourceObject{ - name: src.object, + name: src.object, bucket: src.bucket, - gen: src.gen, - conds: src.conds, + gen: src.gen, + conds: src.conds, } req.srcs = append(req.srcs, s) } From db32bf09cdce2f40d9a373693dd66812fefc96de Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Tue, 26 Jul 2022 13:19:31 -0400 Subject: [PATCH 5/6] gofmt again --- storage/storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/storage.go b/storage/storage.go index 086d2dfc7654..a4f7d6088c19 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -210,7 +210,7 @@ func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error scheme: u.Scheme, readHost: u.Host, creds: creds, - tc: tc, + tc: tc, }, nil } From 54a65a4a339a12117a2015ed3e55cdcd03446fa2 Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Tue, 26 Jul 2022 13:48:40 -0400 Subject: [PATCH 6/6] remove todos --- storage/copy.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/storage/copy.go b/storage/copy.go index 7e00dc8dfe76..1478d275cdef 100644 --- a/storage/copy.go +++ b/storage/copy.go @@ -229,7 +229,6 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { req.srcs = append(req.srcs, s) } - // TODO: factor this out to a function? isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist) opts := []storageOption{idempotent(isIdempotent)} if c.dst.retry != nil { @@ -239,7 +238,5 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { opts = append(opts, withUserProject(c.dst.userProject)) } - // TODO: Need to add withClientOptions or withGAXOptions? - return c.dst.c.tc.ComposeObject(ctx, req, opts...) }