Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(storage): integrate Compose in new interface #6414

Merged
merged 11 commits into from
Jul 26, 2022
1 change: 0 additions & 1 deletion storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ type composeObjectRequest struct {
dstObject destinationObject
srcs []sourceObject
predefinedACL string
encryptionKey []byte
sendCRC32C bool
}

Expand Down
63 changes: 30 additions & 33 deletions storage/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) {
if err := c.dst.validate(); err != nil {
return nil, err
}
if c.dst.gen != defaultGen {
return nil, fmt.Errorf("storage: generation cannot be specified on compose destination, got %v", c.dst.gen)
}
if len(c.srcs) == 0 {
return nil, errors.New("storage: at least one source object must be specified")
}
Expand All @@ -204,45 +207,39 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) {
}
}

// TODO: transport agnostic interface starts here.
req := &raw.ComposeRequest{}
// Compose requires a non-empty Destination, so we always set it,
// even if the caller-provided ObjectAttrs is the zero value.
req.Destination = c.ObjectAttrs.toRawObject(c.dst.bucket)
if c.SendCRC32C {
req.Destination.Crc32c = encodeUint32(c.ObjectAttrs.CRC32C)
}
for _, src := range c.srcs {
srcObj := &raw.ComposeRequestSourceObjects{
Name: src.object,
}
if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil {
return nil, err
req := &composeObjectRequest{
dstBucket: c.dst.bucket,
predefinedACL: c.PredefinedACL,
sendCRC32C: c.SendCRC32C,
}
req.dstObject = destinationObject{
name: c.dst.object,
bucket: c.dst.bucket,
conds: c.dst.conds,
attrs: &c.ObjectAttrs,
encryptionKey: c.dst.encryptionKey,
}
for _, src := range(c.srcs) {
s := sourceObject{
name: src.object,
bucket: src.bucket,
gen: src.gen,
conds: src.conds,
}
req.SourceObjects = append(req.SourceObjects, srcObj)
req.srcs = append(req.srcs, s)
}

call := c.dst.c.raw.Objects.Compose(c.dst.bucket, c.dst.object, req).Context(ctx)
if err := applyConds("ComposeFrom destination", c.dst.gen, c.dst.conds, call); err != nil {
return nil, err
// TODO: factor this out to a function?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the definition of idempotency here a generalized one? Or is it specific to Copy/Compose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's specific to copy/compose. However was thinking I could add a helper func to create the storage opts just to eliminate a few of these lines of boilerplate which will be in every func:

func makeStorageOpts(idempotent bool, retry *retryConfig, userProject string) []storageOption {...}

Maybe this defeats the point of the options interface though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this defeats the point of the options interface though?

Perhaps...let's just keep the TODO and evaluate as we go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be adding this in a separate PR.

isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist)
opts := []storageOption{idempotent(isIdempotent)}
if c.dst.retry != nil {
opts = append(opts, withRetryConfig(c.dst.retry))
}
if c.dst.userProject != "" {
call.UserProject(c.dst.userProject)
opts = append(opts, withUserProject(c.dst.userProject))
}
if c.PredefinedACL != "" {
call.DestinationPredefinedAcl(c.PredefinedACL)
}
if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil {
return nil, err
}
var obj *raw.Object
setClientHeader(call.Header())

retryCall := func() error { obj, err = call.Do(); return err }
isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist)
// TODO: Need to add withClientOptions or withGAXOptions?

if err := run(ctx, retryCall, c.dst.retry, isIdempotent, setRetryHeaderHTTP(call)); err != nil {
return nil, err
}
return newObject(obj), nil
return c.dst.c.tc.ComposeObject(ctx, req, opts...)
}
6 changes: 3 additions & 3 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec

dstObjPb := req.dstObject.attrs.toProtoObject(req.dstBucket)
dstObjPb.Name = req.dstObject.name
if err := applyCondsProto("ComposeObject destination", -1, req.dstObject.conds, dstObjPb); err != nil {
if err := applyCondsProto("ComposeObject destination", defaultGen, req.dstObject.conds, dstObjPb); err != nil {
return nil, err
}
if req.sendCRC32C {
Expand All @@ -750,8 +750,8 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec
if req.predefinedACL != "" {
rawReq.DestinationPredefinedAcl = req.predefinedACL
}
if req.encryptionKey != nil {
rawReq.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.encryptionKey)
if req.dstObject.encryptionKey != nil {
rawReq.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey)
}

var obj *storagepb.Object
Expand Down
4 changes: 2 additions & 2 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec
}

call := c.raw.Objects.Compose(req.dstBucket, req.dstObject.name, rawReq).Context(ctx)
if err := applyConds("ComposeFrom destination", -1, req.dstObject.conds, call); err != nil {
if err := applyConds("ComposeFrom destination", defaultGen, req.dstObject.conds, call); err != nil {
return nil, err
}
if s.userProject != "" {
Expand All @@ -701,7 +701,7 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec
if req.predefinedACL != "" {
call.DestinationPredefinedAcl(req.predefinedACL)
}
if err := setEncryptionHeaders(call.Header(), req.encryptionKey, false); err != nil {
if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil {
return nil, err
}
var obj *raw.Object
Expand Down
2 changes: 1 addition & 1 deletion storage/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.NewRangeReader")
defer func() { trace.EndSpan(ctx, err) }()

if o.c.tc != nil {
if o.c.useGRPC {
return o.newRangeReaderWithGRPC(ctx, offset, length)
}

Expand Down
12 changes: 11 additions & 1 deletion storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ type Client struct {

// tc is the transport-agnostic client implemented with either gRPC or HTTP.
tc storageClient
// useGRPC flags whether the client uses gRPC. This is needed while the
// integration piece is only partially complete.
// TODO: remove before merging to main.
useGRPC bool
}

// NewClient creates a new Google Cloud Storage client.
Expand Down Expand Up @@ -195,12 +199,18 @@ func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error
return nil, fmt.Errorf("supplied endpoint %q is not valid: %v", ep, err)
}

tc, err := newHTTPStorageClient(ctx, withClientOptions(opts...))
if err != nil {
return nil, fmt.Errorf("storage: %v", err)
}

return &Client{
hc: hc,
raw: rawService,
scheme: u.Scheme,
readHost: u.Host,
creds: creds,
tc: tc,
}, nil
}

Expand All @@ -215,7 +225,7 @@ func newGRPCClient(ctx context.Context, opts ...option.ClientOption) (*Client, e
return nil, err
}

return &Client{tc: tc}, nil
return &Client{tc: tc, useGRPC: true}, nil
}

// Close closes the Client.
Expand Down
4 changes: 2 additions & 2 deletions storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (w *Writer) Write(p []byte) (n int, err error) {
}
if !w.opened {
// gRPC client has been initialized - use gRPC to upload.
if w.o.c.tc != nil {
if w.o.c.useGRPC {
if err := w.openWriter(); err != nil {
return 0, err
}
Expand Down Expand Up @@ -264,7 +264,7 @@ func (w *Writer) Write(p []byte) (n int, err error) {
// can be retrieved by calling Attrs.
func (w *Writer) Close() error {
if !w.opened {
if w.o.c.tc != nil {
if w.o.c.useGRPC {
if err := w.openWriter(); err != nil {
return err
}
Expand Down