diff --git a/storage/client.go b/storage/client.go index 7117d5820d79..ef443b755853 100644 --- a/storage/client.go +++ b/storage/client.go @@ -278,34 +278,33 @@ type newRangeReaderParams struct { type composeObjectRequest struct { dstBucket string - dstObject composeDstObject - srcs []composeSrcObject + dstObject destinationObject + srcs []sourceObject predefinedACL string encryptionKey []byte sendCRC32C bool } -type composeSrcObject struct { - name string - gen int64 - conds *Conditions +type sourceObject struct { + name string + bucket string + gen int64 + conds *Conditions + encryptionKey []byte } -type composeDstObject struct { - name string - conds *Conditions - attrs *ObjectAttrs // attrs to set on the destination object. +type destinationObject struct { + name string + bucket string + conds *Conditions + attrs *ObjectAttrs // attrs to set on the destination object. + encryptionKey []byte + keyName string } type rewriteObjectRequest struct { - srcBucket string - srcObject string - dstBucket string - dstObject string - dstKeyName string - attrs *ObjectAttrs - gen int64 - conds *Conditions + srcObject sourceObject + dstObject destinationObject predefinedACL string token string } diff --git a/storage/client_test.go b/storage/client_test.go index cb667e30f037..a32d5b9ef007 100644 --- a/storage/client_test.go +++ b/storage/client_test.go @@ -276,6 +276,51 @@ func TestGetObjectEmulated(t *testing.T) { }) } +func TestRewriteObjectEmulated(t *testing.T) { + transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { + // Populate test object. + _, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ + Name: bucket, + }) + if err != nil { + t.Fatalf("client.CreateBucket: %v", err) + } + src := ObjectAttrs{ + Bucket: bucket, + Name: fmt.Sprintf("testObject-%d", time.Now().Nanosecond()), + } + w := veneerClient.Bucket(bucket).Object(src.Name).NewWriter(context.Background()) + if _, err := w.Write(randomBytesToWrite); err != nil { + t.Fatalf("failed to populate test object: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("closing object: %v", err) + } + req := &rewriteObjectRequest{ + dstObject: destinationObject{ + bucket: bucket, + name: fmt.Sprintf("copy-of-%s", src.Name), + attrs: &ObjectAttrs{}, + }, + srcObject: sourceObject{ + bucket: bucket, + name: src.Name, + gen: defaultGen, + }, + } + got, err := client.RewriteObject(context.Background(), req) + if err != nil { + t.Fatal(err) + } + if !got.done { + t.Fatal("didn't finish writing!") + } + if want := int64(len(randomBytesToWrite)); got.written != want { + t.Errorf("Bytes written: got %d, want %d", got.written, want) + } + }) +} + func TestUpdateObjectEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test object. @@ -986,11 +1031,11 @@ func TestComposeEmulated(t *testing.T) { dstName := fmt.Sprintf("%d-object3", prefix) req := composeObjectRequest{ dstBucket: bucket, - dstObject: composeDstObject{ + dstObject: destinationObject{ name: dstName, attrs: &ObjectAttrs{StorageClass: "COLDLINE"}, }, - srcs: []composeSrcObject{ + srcs: []sourceObject{ {name: srcNames[0]}, {name: srcNames[1]}, }, diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 4bf8f6baefb2..110baebb86fb 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -766,7 +766,56 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec return newObjectFromProto(obj), nil } func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) { - return nil, errMethodNotSupported + s := callSettings(c.settings, opts...) + obj := req.dstObject.attrs.toProtoObject("") + call := &storagepb.RewriteObjectRequest{ + SourceBucket: bucketResourceName(globalProjectAlias, req.srcObject.bucket), + SourceObject: req.srcObject.name, + RewriteToken: req.token, + DestinationBucket: bucketResourceName(globalProjectAlias, req.dstObject.bucket), + DestinationName: req.dstObject.name, + Destination: obj, + DestinationKmsKey: req.dstObject.keyName, + DestinationPredefinedAcl: req.predefinedACL, + } + + // The userProject, whether source or destination project, is decided by the code calling the interface. + if s.userProject != "" { + ctx = setUserProjectMetadata(ctx, s.userProject) + } + if err := applyCondsProto("Copy destination", defaultGen, req.dstObject.conds, call); err != nil { + return nil, err + } + if err := applySourceCondsProto(req.srcObject.gen, req.srcObject.conds, call); err != nil { + return nil, err + } + + if len(req.dstObject.encryptionKey) > 0 { + call.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey) + } + if len(req.srcObject.encryptionKey) > 0 { + srcParams := toProtoCommonObjectRequestParams(req.srcObject.encryptionKey) + call.CopySourceEncryptionAlgorithm = srcParams.GetEncryptionAlgorithm() + call.CopySourceEncryptionKeyBytes = srcParams.GetEncryptionKeyBytes() + call.CopySourceEncryptionKeySha256Bytes = srcParams.GetEncryptionKeySha256Bytes() + } + var res *storagepb.RewriteResponse + var err error + + retryCall := func() error { res, err = c.raw.RewriteObject(ctx, call, s.gax...); return err } + + if err := run(ctx, retryCall, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)); err != nil { + return nil, err + } + + r := &rewriteObjectResponse{ + done: res.GetDone(), + written: res.GetTotalBytesRewritten(), + token: res.GetRewriteToken(), + resource: newObjectFromProto(res.GetResource()), + } + + return r, nil } func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) { diff --git a/storage/http_client.go b/storage/http_client.go index a1c26cda81a1..ffa2e3699436 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -716,7 +716,55 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec return newObject(obj), nil } func (c *httpStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) { - return nil, errMethodNotSupported + s := callSettings(c.settings, opts...) + rawObject := req.dstObject.attrs.toRawObject("") + call := c.raw.Objects.Rewrite(req.srcObject.bucket, req.srcObject.name, req.dstObject.bucket, req.dstObject.name, rawObject) + + call.Context(ctx).Projection("full") + if req.token != "" { + call.RewriteToken(req.token) + } + if req.dstObject.keyName != "" { + call.DestinationKmsKeyName(req.dstObject.keyName) + } + if req.predefinedACL != "" { + call.DestinationPredefinedAcl(req.predefinedACL) + } + if err := applyConds("Copy destination", defaultGen, req.dstObject.conds, call); err != nil { + return nil, err + } + if err := applySourceConds(req.srcObject.gen, req.srcObject.conds, call); err != nil { + return nil, err + } + if s.userProject != "" { + call.UserProject(s.userProject) + } + // Set destination encryption headers. + if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil { + return nil, err + } + // Set source encryption headers. + if err := setEncryptionHeaders(call.Header(), req.srcObject.encryptionKey, true); err != nil { + return nil, err + } + var res *raw.RewriteResponse + var err error + setClientHeader(call.Header()) + + retryCall := func() error { res, err = call.Do(); return err } + + if err := run(ctx, retryCall, s.retry, s.idempotent, setRetryHeaderHTTP(call)); err != nil { + return nil, err + } + + r := &rewriteObjectResponse{ + done: res.Done, + written: res.TotalBytesRewritten, + token: res.RewriteToken, + resource: newObject(res.Resource), + } + + return r, nil } func (c *httpStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) { diff --git a/storage/storage.go b/storage/storage.go index 79ab04f60cd9..d6634e3dbca1 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1208,8 +1208,11 @@ func (o *ObjectAttrs) toProtoObject(b string) *storagepb.Object { } // For now, there are only globally unique buckets, and "_" is the alias - // project ID for such buckets. - b = bucketResourceName("_", b) + // project ID for such buckets. If the bucket is not provided, like in the + // destination ObjectAttrs of a Copy, do not attempt to format it. + if b != "" { + b = bucketResourceName(globalProjectAlias, b) + } return &storagepb.Object{ Bucket: b, @@ -1838,6 +1841,33 @@ func applySourceConds(gen int64, conds *Conditions, call *raw.ObjectsRewriteCall return nil } +func applySourceCondsProto(gen int64, conds *Conditions, call *storagepb.RewriteObjectRequest) error { + if gen >= 0 { + call.SourceGeneration = gen + } + if conds == nil { + return nil + } + if err := conds.validate("CopyTo source"); err != nil { + return err + } + switch { + case conds.GenerationMatch != 0: + call.IfSourceGenerationMatch = proto.Int64(conds.GenerationMatch) + case conds.GenerationNotMatch != 0: + call.IfSourceGenerationNotMatch = proto.Int64(conds.GenerationNotMatch) + case conds.DoesNotExist: + call.IfSourceGenerationMatch = proto.Int64(0) + } + switch { + case conds.MetagenerationMatch != 0: + call.IfSourceMetagenerationMatch = proto.Int64(conds.MetagenerationMatch) + case conds.MetagenerationNotMatch != 0: + call.IfSourceMetagenerationNotMatch = proto.Int64(conds.MetagenerationNotMatch) + } + return nil +} + // setConditionField sets a field on a *raw.WhateverCall. // We can't use anonymous interfaces because the return type is // different, since the field setters are builders.