Skip to content

Commit

Permalink
chore(storage): migrate Writer (#6452)
Browse files Browse the repository at this point in the history
Migrate Writer to use the transport-agnostic interface. A few
minor fixes:

* Pass in retry, idempotency and user project via settings rather
than determining below interface.
* Use default generation since this can't be sent for this call.
  • Loading branch information
tritone authored Aug 1, 2022
1 parent 5ee0de3 commit 728ab63
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 125 deletions.
10 changes: 8 additions & 2 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ const (
// This is only used for the gRPC client.
defaultConnPoolSize = 4

// maxPerMessageWriteSize is the maximum amount of content that can be sent
// per WriteObjectRequest message. A buffer reaching this amount will
// precipitate a flush of the buffer. It is only used by the gRPC Writer
// implementation.
maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES)

// globalProjectAlias is the project ID alias used for global buckets.
//
// This is only used for the gRPC API.
Expand Down Expand Up @@ -1649,8 +1655,8 @@ func (w *gRPCWriter) writeObjectSpec() (*storagepb.WriteObjectSpec, error) {
spec := &storagepb.WriteObjectSpec{
Resource: attrs.toProtoObject(w.bucket),
}
// WriteObject doesn't support the generation condition, so use -1.
if err := applyCondsProto("WriteObject", -1, w.conds, spec); err != nil {
// WriteObject doesn't support the generation condition, so use default.
if err := applyCondsProto("WriteObject", defaultGen, w.conds, spec); err != nil {
return nil, err
}
return spec, nil
Expand Down
2 changes: 1 addition & 1 deletion storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ func (c *httpStorageClient) OpenWriter(params *openWriterParams, opts ...storage
return
}
var resp *raw.Object
err := applyConds("NewWriter", params.attrs.Generation, params.conds, call)
err := applyConds("NewWriter", defaultGen, params.conds, call)
if err == nil {
if s.userProject != "" {
call.UserProject(s.userProject)
Expand Down
2 changes: 1 addition & 1 deletion storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ func TestCondition(t *testing.T) {

// Test an error, too:
err = obj.Generation(1234).NewWriter(ctx).Close()
if err == nil || !strings.Contains(err.Error(), "NewWriter: generation not supported") {
if err == nil || !strings.Contains(err.Error(), "storage: generation not supported") {
t.Errorf("want error about unsupported generation; got %v", err)
}
}
Expand Down
129 changes: 8 additions & 121 deletions storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,12 @@ package storage

import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"sync"
"time"
"unicode/utf8"

storagepb "cloud.google.com/go/storage/internal/apiv2/stubs"
"google.golang.org/api/googleapi"
raw "google.golang.org/api/storage/v1"
)

const (
// Maximum amount of content that can be sent per WriteObjectRequest message.
// A buffer reaching this amount will precipitate a flush of the buffer.
//
// This is only used for the gRPC-based Writer.
maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES)
)

// A Writer writes a Cloud Storage object.
Expand Down Expand Up @@ -122,102 +109,6 @@ type Writer struct {
err error
}

func (w *Writer) open() error {
if err := w.validateWriteAttrs(); err != nil {
return err
}

pr, pw := io.Pipe()
w.pw = pw
w.opened = true

go w.monitorCancel()

attrs := w.ObjectAttrs
mediaOpts := []googleapi.MediaOption{
googleapi.ChunkSize(w.ChunkSize),
}
if c := attrs.ContentType; c != "" {
mediaOpts = append(mediaOpts, googleapi.ContentType(c))
}
if w.ChunkRetryDeadline != 0 {
mediaOpts = append(mediaOpts, googleapi.ChunkRetryDeadline(w.ChunkRetryDeadline))
}

go func() {
defer close(w.donec)

rawObj := attrs.toRawObject(w.o.bucket)
if w.SendCRC32C {
rawObj.Crc32c = encodeUint32(attrs.CRC32C)
}
if w.MD5 != nil {
rawObj.Md5Hash = base64.StdEncoding.EncodeToString(w.MD5)
}
call := w.o.c.raw.Objects.Insert(w.o.bucket, rawObj).
Media(pr, mediaOpts...).
Projection("full").
Context(w.ctx).
Name(w.o.object)

if w.ProgressFunc != nil {
call.ProgressUpdater(func(n, _ int64) { w.ProgressFunc(n) })
}
if attrs.KMSKeyName != "" {
call.KmsKeyName(attrs.KMSKeyName)
}
if attrs.PredefinedACL != "" {
call.PredefinedAcl(attrs.PredefinedACL)
}
if err := setEncryptionHeaders(call.Header(), w.o.encryptionKey, false); err != nil {
w.mu.Lock()
w.err = err
w.mu.Unlock()
pr.CloseWithError(err)
return
}
var resp *raw.Object
err := applyConds("NewWriter", w.o.gen, w.o.conds, call)
if err == nil {
if w.o.userProject != "" {
call.UserProject(w.o.userProject)
}
setClientHeader(call.Header())

// The internals that perform call.Do automatically retry both the initial
// call to set up the upload as well as calls to upload individual chunks
// for a resumable upload (as long as the chunk size is non-zero). Hence
// there is no need to add retries here.

// Retry only when the operation is idempotent or the retry policy is RetryAlways.
isIdempotent := w.o.conds != nil && (w.o.conds.GenerationMatch >= 0 || w.o.conds.DoesNotExist == true)
var useRetry bool
if (w.o.retry == nil || w.o.retry.policy == RetryIdempotent) && isIdempotent {
useRetry = true
} else if w.o.retry != nil && w.o.retry.policy == RetryAlways {
useRetry = true
}
if useRetry {
if w.o.retry != nil {
call.WithRetry(w.o.retry.backoff, w.o.retry.shouldRetry)
} else {
call.WithRetry(nil, nil)
}
}
resp, err = call.Do()
}
if err != nil {
w.mu.Lock()
w.err = err
w.mu.Unlock()
pr.CloseWithError(err)
return
}
w.obj = newObject(resp)
}()
return nil
}

// Write appends to w. It implements the io.Writer interface.
//
// Since writes happen asynchronously, Write may return a nil
Expand All @@ -235,12 +126,7 @@ func (w *Writer) Write(p []byte) (n int, err error) {
return 0, werr
}
if !w.opened {
// gRPC client has been initialized - use gRPC to upload.
if w.o.c.useGRPC {
if err := w.openWriter(); err != nil {
return 0, err
}
} else if err := w.open(); err != nil {
if err := w.openWriter(); err != nil {
return 0, err
}
}
Expand All @@ -264,11 +150,7 @@ func (w *Writer) Write(p []byte) (n int, err error) {
// can be retrieved by calling Attrs.
func (w *Writer) Close() error {
if !w.opened {
if w.o.c.useGRPC {
if err := w.openWriter(); err != nil {
return err
}
} else if err := w.open(); err != nil {
if err := w.openWriter(); err != nil {
return err
}
}
Expand All @@ -288,7 +170,12 @@ func (w *Writer) openWriter() (err error) {
if err := w.validateWriteAttrs(); err != nil {
return err
}
if w.o.gen != defaultGen {
return fmt.Errorf("storage: generation not supported on Writer, got %v", w.o.gen)
}

isIdempotent := w.o.conds != nil && (w.o.conds.GenerationMatch >= 0 || w.o.conds.DoesNotExist == true)
opts := makeStorageOpts(isIdempotent, w.o.retry, w.o.userProject)
go w.monitorCancel()
params := &openWriterParams{
ctx: w.ctx,
Expand All @@ -304,7 +191,7 @@ func (w *Writer) openWriter() (err error) {
progress: w.progress,
setObj: func(o *ObjectAttrs) { w.obj = o },
}
w.pw, err = w.o.c.tc.OpenWriter(params)
w.pw, err = w.o.c.tc.OpenWriter(params, opts...)
if err != nil {
return err
}
Expand Down

0 comments on commit 728ab63

Please sign in to comment.