Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(storage): migrate Writer #6452

Merged
merged 3 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ const (
// This is only used for the gRPC client.
defaultConnPoolSize = 4

// maxPerMessageWriteSize is the maximum amount of content that can be sent
// per WriteObjectRequest message. A buffer reaching this amount will
// precipitate a flush of the buffer. It is only used by the gRPC Writer
// implementation.
maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES)

// globalProjectAlias is the project ID alias used for global buckets.
//
// This is only used for the gRPC API.
Expand Down Expand Up @@ -1647,8 +1653,8 @@ func (w *gRPCWriter) writeObjectSpec() (*storagepb.WriteObjectSpec, error) {
spec := &storagepb.WriteObjectSpec{
Resource: attrs.toProtoObject(w.bucket),
}
// WriteObject doesn't support the generation condition, so use -1.
if err := applyCondsProto("WriteObject", -1, w.conds, spec); err != nil {
// WriteObject doesn't support the generation condition, so use default.
if err := applyCondsProto("WriteObject", defaultGen, w.conds, spec); err != nil {
return nil, err
}
return spec, nil
Expand Down
2 changes: 1 addition & 1 deletion storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ func (c *httpStorageClient) OpenWriter(params *openWriterParams, opts ...storage
return
}
var resp *raw.Object
err := applyConds("NewWriter", params.attrs.Generation, params.conds, call)
err := applyConds("NewWriter", defaultGen, params.conds, call)
if err == nil {
if s.userProject != "" {
call.UserProject(s.userProject)
Expand Down
2 changes: 1 addition & 1 deletion storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ func TestCondition(t *testing.T) {

// Test an error, too:
err = obj.Generation(1234).NewWriter(ctx).Close()
if err == nil || !strings.Contains(err.Error(), "NewWriter: generation not supported") {
if err == nil || !strings.Contains(err.Error(), "storage: generation not supported") {
t.Errorf("want error about unsupported generation; got %v", err)
}
}
Expand Down
129 changes: 8 additions & 121 deletions storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,12 @@ package storage

import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"sync"
"time"
"unicode/utf8"

storagepb "cloud.google.com/go/storage/internal/apiv2/stubs"
"google.golang.org/api/googleapi"
raw "google.golang.org/api/storage/v1"
)

const (
// Maximum amount of content that can be sent per WriteObjectRequest message.
// A buffer reaching this amount will precipitate a flush of the buffer.
//
// This is only used for the gRPC-based Writer.
maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES)
)

// A Writer writes a Cloud Storage object.
Expand Down Expand Up @@ -122,102 +109,6 @@ type Writer struct {
err error
}

func (w *Writer) open() error {
if err := w.validateWriteAttrs(); err != nil {
return err
}

pr, pw := io.Pipe()
w.pw = pw
w.opened = true

go w.monitorCancel()

attrs := w.ObjectAttrs
mediaOpts := []googleapi.MediaOption{
googleapi.ChunkSize(w.ChunkSize),
}
if c := attrs.ContentType; c != "" {
mediaOpts = append(mediaOpts, googleapi.ContentType(c))
}
if w.ChunkRetryDeadline != 0 {
mediaOpts = append(mediaOpts, googleapi.ChunkRetryDeadline(w.ChunkRetryDeadline))
}

go func() {
defer close(w.donec)

rawObj := attrs.toRawObject(w.o.bucket)
if w.SendCRC32C {
rawObj.Crc32c = encodeUint32(attrs.CRC32C)
}
if w.MD5 != nil {
rawObj.Md5Hash = base64.StdEncoding.EncodeToString(w.MD5)
}
call := w.o.c.raw.Objects.Insert(w.o.bucket, rawObj).
Media(pr, mediaOpts...).
Projection("full").
Context(w.ctx).
Name(w.o.object)

if w.ProgressFunc != nil {
call.ProgressUpdater(func(n, _ int64) { w.ProgressFunc(n) })
}
if attrs.KMSKeyName != "" {
call.KmsKeyName(attrs.KMSKeyName)
}
if attrs.PredefinedACL != "" {
call.PredefinedAcl(attrs.PredefinedACL)
}
if err := setEncryptionHeaders(call.Header(), w.o.encryptionKey, false); err != nil {
w.mu.Lock()
w.err = err
w.mu.Unlock()
pr.CloseWithError(err)
return
}
var resp *raw.Object
err := applyConds("NewWriter", w.o.gen, w.o.conds, call)
if err == nil {
if w.o.userProject != "" {
call.UserProject(w.o.userProject)
}
setClientHeader(call.Header())

// The internals that perform call.Do automatically retry both the initial
// call to set up the upload as well as calls to upload individual chunks
// for a resumable upload (as long as the chunk size is non-zero). Hence
// there is no need to add retries here.

// Retry only when the operation is idempotent or the retry policy is RetryAlways.
isIdempotent := w.o.conds != nil && (w.o.conds.GenerationMatch >= 0 || w.o.conds.DoesNotExist == true)
var useRetry bool
if (w.o.retry == nil || w.o.retry.policy == RetryIdempotent) && isIdempotent {
useRetry = true
} else if w.o.retry != nil && w.o.retry.policy == RetryAlways {
useRetry = true
}
if useRetry {
if w.o.retry != nil {
call.WithRetry(w.o.retry.backoff, w.o.retry.shouldRetry)
} else {
call.WithRetry(nil, nil)
}
}
resp, err = call.Do()
}
if err != nil {
w.mu.Lock()
w.err = err
w.mu.Unlock()
pr.CloseWithError(err)
return
}
w.obj = newObject(resp)
}()
return nil
}

// Write appends to w. It implements the io.Writer interface.
//
// Since writes happen asynchronously, Write may return a nil
Expand All @@ -235,12 +126,7 @@ func (w *Writer) Write(p []byte) (n int, err error) {
return 0, werr
}
if !w.opened {
// gRPC client has been initialized - use gRPC to upload.
if w.o.c.useGRPC {
if err := w.openWriter(); err != nil {
return 0, err
}
} else if err := w.open(); err != nil {
if err := w.openWriter(); err != nil {
return 0, err
}
}
Expand All @@ -264,11 +150,7 @@ func (w *Writer) Write(p []byte) (n int, err error) {
// can be retrieved by calling Attrs.
func (w *Writer) Close() error {
if !w.opened {
if w.o.c.useGRPC {
if err := w.openWriter(); err != nil {
return err
}
} else if err := w.open(); err != nil {
if err := w.openWriter(); err != nil {
return err
}
}
Expand All @@ -288,7 +170,12 @@ func (w *Writer) openWriter() (err error) {
if err := w.validateWriteAttrs(); err != nil {
return err
}
if w.o.gen != defaultGen {
return fmt.Errorf("storage: generation not supported on Writer, got %v", w.o.gen)
}

isIdempotent := w.o.conds != nil && (w.o.conds.GenerationMatch >= 0 || w.o.conds.DoesNotExist == true)
opts := makeStorageOpts(isIdempotent, w.o.retry, w.o.userProject)
go w.monitorCancel()
params := &openWriterParams{
ctx: w.ctx,
Expand All @@ -304,7 +191,7 @@ func (w *Writer) openWriter() (err error) {
progress: w.progress,
setObj: func(o *ObjectAttrs) { w.obj = o },
}
w.pw, err = w.o.c.tc.OpenWriter(params)
w.pw, err = w.o.c.tc.OpenWriter(params, opts...)
if err != nil {
return err
}
Expand Down