Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(storage): integrate Compose in new interface #6414

Merged
merged 11 commits into from
Jul 26, 2022
1 change: 0 additions & 1 deletion storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ type composeObjectRequest struct {
dstObject destinationObject
srcs []sourceObject
predefinedACL string
encryptionKey []byte
sendCRC32C bool
}

Expand Down
58 changes: 26 additions & 32 deletions storage/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) {
if err := c.dst.validate(); err != nil {
return nil, err
}
if c.dst.gen != defaultGen {
return nil, fmt.Errorf("storage: generation cannot be specified on compose destination, got %v", c.dst.gen)
}
if len(c.srcs) == 0 {
return nil, errors.New("storage: at least one source object must be specified")
}
Expand All @@ -204,45 +207,36 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) {
}
}

// TODO: transport agnostic interface starts here.
req := &raw.ComposeRequest{}
// Compose requires a non-empty Destination, so we always set it,
// even if the caller-provided ObjectAttrs is the zero value.
req.Destination = c.ObjectAttrs.toRawObject(c.dst.bucket)
if c.SendCRC32C {
req.Destination.Crc32c = encodeUint32(c.ObjectAttrs.CRC32C)
req := &composeObjectRequest{
dstBucket: c.dst.bucket,
predefinedACL: c.PredefinedACL,
sendCRC32C: c.SendCRC32C,
}
req.dstObject = destinationObject{
name: c.dst.object,
bucket: c.dst.bucket,
conds: c.dst.conds,
attrs: &c.ObjectAttrs,
encryptionKey: c.dst.encryptionKey,
}
for _, src := range c.srcs {
srcObj := &raw.ComposeRequestSourceObjects{
Name: src.object,
s := sourceObject{
name: src.object,
bucket: src.bucket,
gen: src.gen,
conds: src.conds,
}
if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil {
return nil, err
}
req.SourceObjects = append(req.SourceObjects, srcObj)
req.srcs = append(req.srcs, s)
}

call := c.dst.c.raw.Objects.Compose(c.dst.bucket, c.dst.object, req).Context(ctx)
if err := applyConds("ComposeFrom destination", c.dst.gen, c.dst.conds, call); err != nil {
return nil, err
isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist)
opts := []storageOption{idempotent(isIdempotent)}
if c.dst.retry != nil {
opts = append(opts, withRetryConfig(c.dst.retry))
}
if c.dst.userProject != "" {
call.UserProject(c.dst.userProject)
}
if c.PredefinedACL != "" {
call.DestinationPredefinedAcl(c.PredefinedACL)
opts = append(opts, withUserProject(c.dst.userProject))
}
if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil {
return nil, err
}
var obj *raw.Object
setClientHeader(call.Header())

retryCall := func() error { obj, err = call.Do(); return err }
isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist)

if err := run(ctx, retryCall, c.dst.retry, isIdempotent, setRetryHeaderHTTP(call)); err != nil {
return nil, err
}
return newObject(obj), nil
return c.dst.c.tc.ComposeObject(ctx, req, opts...)
}
6 changes: 3 additions & 3 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec

dstObjPb := req.dstObject.attrs.toProtoObject(req.dstBucket)
dstObjPb.Name = req.dstObject.name
if err := applyCondsProto("ComposeObject destination", -1, req.dstObject.conds, dstObjPb); err != nil {
if err := applyCondsProto("ComposeObject destination", defaultGen, req.dstObject.conds, dstObjPb); err != nil {
return nil, err
}
if req.sendCRC32C {
Expand All @@ -750,8 +750,8 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec
if req.predefinedACL != "" {
rawReq.DestinationPredefinedAcl = req.predefinedACL
}
if req.encryptionKey != nil {
rawReq.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.encryptionKey)
if req.dstObject.encryptionKey != nil {
rawReq.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey)
}

var obj *storagepb.Object
Expand Down
4 changes: 2 additions & 2 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec
}

call := c.raw.Objects.Compose(req.dstBucket, req.dstObject.name, rawReq).Context(ctx)
if err := applyConds("ComposeFrom destination", -1, req.dstObject.conds, call); err != nil {
if err := applyConds("ComposeFrom destination", defaultGen, req.dstObject.conds, call); err != nil {
return nil, err
}
if s.userProject != "" {
Expand All @@ -701,7 +701,7 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec
if req.predefinedACL != "" {
call.DestinationPredefinedAcl(req.predefinedACL)
}
if err := setEncryptionHeaders(call.Header(), req.encryptionKey, false); err != nil {
if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil {
return nil, err
}
var obj *raw.Object
Expand Down
2 changes: 1 addition & 1 deletion storage/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.NewRangeReader")
defer func() { trace.EndSpan(ctx, err) }()

if o.c.tc != nil {
if o.c.useGRPC {
return o.newRangeReaderWithGRPC(ctx, offset, length)
}

Expand Down
12 changes: 11 additions & 1 deletion storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ type Client struct {

// tc is the transport-agnostic client implemented with either gRPC or HTTP.
tc storageClient
// useGRPC flags whether the client uses gRPC. This is needed while the
// integration piece is only partially complete.
// TODO: remove before merging to main.
useGRPC bool
}

// NewClient creates a new Google Cloud Storage client.
Expand Down Expand Up @@ -195,12 +199,18 @@ func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error
return nil, fmt.Errorf("supplied endpoint %q is not valid: %v", ep, err)
}

tc, err := newHTTPStorageClient(ctx, withClientOptions(opts...))
if err != nil {
return nil, fmt.Errorf("storage: %v", err)
}

return &Client{
hc: hc,
raw: rawService,
scheme: u.Scheme,
readHost: u.Host,
creds: creds,
tc: tc,
}, nil
}

Expand All @@ -215,7 +225,7 @@ func newGRPCClient(ctx context.Context, opts ...option.ClientOption) (*Client, e
return nil, err
}

return &Client{tc: tc}, nil
return &Client{tc: tc, useGRPC: true}, nil
}

// Close closes the Client.
Expand Down
4 changes: 2 additions & 2 deletions storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (w *Writer) Write(p []byte) (n int, err error) {
}
if !w.opened {
// gRPC client has been initialized - use gRPC to upload.
if w.o.c.tc != nil {
if w.o.c.useGRPC {
if err := w.openWriter(); err != nil {
return 0, err
}
Expand Down Expand Up @@ -264,7 +264,7 @@ func (w *Writer) Write(p []byte) (n int, err error) {
// can be retrieved by calling Attrs.
func (w *Writer) Close() error {
if !w.opened {
if w.o.c.tc != nil {
if w.o.c.useGRPC {
if err := w.openWriter(); err != nil {
return err
}
Expand Down