Skip to content

Commit

Permalink
chore(storage): RewriteObject implementation (#6313)
Browse files Browse the repository at this point in the history
* chore(storage): RewriteObject implementation

* address feedback

* refactor source/destination object types

* address feedback

* address feedback

* fix test
  • Loading branch information
noahdietz authored Jul 20, 2022
1 parent 0c563c8 commit e8d24c1
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 24 deletions.
35 changes: 17 additions & 18 deletions storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,34 +278,33 @@ type newRangeReaderParams struct {

type composeObjectRequest struct {
dstBucket string
dstObject composeDstObject
srcs []composeSrcObject
dstObject destinationObject
srcs []sourceObject
predefinedACL string
encryptionKey []byte
sendCRC32C bool
}

type composeSrcObject struct {
name string
gen int64
conds *Conditions
type sourceObject struct {
name string
bucket string
gen int64
conds *Conditions
encryptionKey []byte
}

type composeDstObject struct {
name string
conds *Conditions
attrs *ObjectAttrs // attrs to set on the destination object.
type destinationObject struct {
name string
bucket string
conds *Conditions
attrs *ObjectAttrs // attrs to set on the destination object.
encryptionKey []byte
keyName string
}

type rewriteObjectRequest struct {
srcBucket string
srcObject string
dstBucket string
dstObject string
dstKeyName string
attrs *ObjectAttrs
gen int64
conds *Conditions
srcObject sourceObject
dstObject destinationObject
predefinedACL string
token string
}
Expand Down
49 changes: 47 additions & 2 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,51 @@ func TestGetObjectEmulated(t *testing.T) {
})
}

func TestRewriteObjectEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
// Populate test object.
_, err := client.CreateBucket(context.Background(), project, &BucketAttrs{
Name: bucket,
})
if err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}
src := ObjectAttrs{
Bucket: bucket,
Name: fmt.Sprintf("testObject-%d", time.Now().Nanosecond()),
}
w := veneerClient.Bucket(bucket).Object(src.Name).NewWriter(context.Background())
if _, err := w.Write(randomBytesToWrite); err != nil {
t.Fatalf("failed to populate test object: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("closing object: %v", err)
}
req := &rewriteObjectRequest{
dstObject: destinationObject{
bucket: bucket,
name: fmt.Sprintf("copy-of-%s", src.Name),
attrs: &ObjectAttrs{},
},
srcObject: sourceObject{
bucket: bucket,
name: src.Name,
gen: defaultGen,
},
}
got, err := client.RewriteObject(context.Background(), req)
if err != nil {
t.Fatal(err)
}
if !got.done {
t.Fatal("didn't finish writing!")
}
if want := int64(len(randomBytesToWrite)); got.written != want {
t.Errorf("Bytes written: got %d, want %d", got.written, want)
}
})
}

func TestUpdateObjectEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
// Populate test object.
Expand Down Expand Up @@ -986,11 +1031,11 @@ func TestComposeEmulated(t *testing.T) {
dstName := fmt.Sprintf("%d-object3", prefix)
req := composeObjectRequest{
dstBucket: bucket,
dstObject: composeDstObject{
dstObject: destinationObject{
name: dstName,
attrs: &ObjectAttrs{StorageClass: "COLDLINE"},
},
srcs: []composeSrcObject{
srcs: []sourceObject{
{name: srcNames[0]},
{name: srcNames[1]},
},
Expand Down
51 changes: 50 additions & 1 deletion storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,56 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec
return newObjectFromProto(obj), nil
}
func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) {
return nil, errMethodNotSupported
s := callSettings(c.settings, opts...)
obj := req.dstObject.attrs.toProtoObject("")
call := &storagepb.RewriteObjectRequest{
SourceBucket: bucketResourceName(globalProjectAlias, req.srcObject.bucket),
SourceObject: req.srcObject.name,
RewriteToken: req.token,
DestinationBucket: bucketResourceName(globalProjectAlias, req.dstObject.bucket),
DestinationName: req.dstObject.name,
Destination: obj,
DestinationKmsKey: req.dstObject.keyName,
DestinationPredefinedAcl: req.predefinedACL,
}

// The userProject, whether source or destination project, is decided by the code calling the interface.
if s.userProject != "" {
ctx = setUserProjectMetadata(ctx, s.userProject)
}
if err := applyCondsProto("Copy destination", defaultGen, req.dstObject.conds, call); err != nil {
return nil, err
}
if err := applySourceCondsProto(req.srcObject.gen, req.srcObject.conds, call); err != nil {
return nil, err
}

if len(req.dstObject.encryptionKey) > 0 {
call.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey)
}
if len(req.srcObject.encryptionKey) > 0 {
srcParams := toProtoCommonObjectRequestParams(req.srcObject.encryptionKey)
call.CopySourceEncryptionAlgorithm = srcParams.GetEncryptionAlgorithm()
call.CopySourceEncryptionKeyBytes = srcParams.GetEncryptionKeyBytes()
call.CopySourceEncryptionKeySha256Bytes = srcParams.GetEncryptionKeySha256Bytes()
}
var res *storagepb.RewriteResponse
var err error

retryCall := func() error { res, err = c.raw.RewriteObject(ctx, call, s.gax...); return err }

if err := run(ctx, retryCall, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)); err != nil {
return nil, err
}

r := &rewriteObjectResponse{
done: res.GetDone(),
written: res.GetTotalBytesRewritten(),
token: res.GetRewriteToken(),
resource: newObjectFromProto(res.GetResource()),
}

return r, nil
}

func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
Expand Down
50 changes: 49 additions & 1 deletion storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,55 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec
return newObject(obj), nil
}
func (c *httpStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) {
return nil, errMethodNotSupported
s := callSettings(c.settings, opts...)
rawObject := req.dstObject.attrs.toRawObject("")
call := c.raw.Objects.Rewrite(req.srcObject.bucket, req.srcObject.name, req.dstObject.bucket, req.dstObject.name, rawObject)

call.Context(ctx).Projection("full")
if req.token != "" {
call.RewriteToken(req.token)
}
if req.dstObject.keyName != "" {
call.DestinationKmsKeyName(req.dstObject.keyName)
}
if req.predefinedACL != "" {
call.DestinationPredefinedAcl(req.predefinedACL)
}
if err := applyConds("Copy destination", defaultGen, req.dstObject.conds, call); err != nil {
return nil, err
}
if err := applySourceConds(req.srcObject.gen, req.srcObject.conds, call); err != nil {
return nil, err
}
if s.userProject != "" {
call.UserProject(s.userProject)
}
// Set destination encryption headers.
if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil {
return nil, err
}
// Set source encryption headers.
if err := setEncryptionHeaders(call.Header(), req.srcObject.encryptionKey, true); err != nil {
return nil, err
}
var res *raw.RewriteResponse
var err error
setClientHeader(call.Header())

retryCall := func() error { res, err = call.Do(); return err }

if err := run(ctx, retryCall, s.retry, s.idempotent, setRetryHeaderHTTP(call)); err != nil {
return nil, err
}

r := &rewriteObjectResponse{
done: res.Done,
written: res.TotalBytesRewritten,
token: res.RewriteToken,
resource: newObject(res.Resource),
}

return r, nil
}

func (c *httpStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
Expand Down
34 changes: 32 additions & 2 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,8 +1208,11 @@ func (o *ObjectAttrs) toProtoObject(b string) *storagepb.Object {
}

// For now, there are only globally unique buckets, and "_" is the alias
// project ID for such buckets.
b = bucketResourceName("_", b)
// project ID for such buckets. If the bucket is not provided, like in the
// destination ObjectAttrs of a Copy, do not attempt to format it.
if b != "" {
b = bucketResourceName(globalProjectAlias, b)
}

return &storagepb.Object{
Bucket: b,
Expand Down Expand Up @@ -1838,6 +1841,33 @@ func applySourceConds(gen int64, conds *Conditions, call *raw.ObjectsRewriteCall
return nil
}

func applySourceCondsProto(gen int64, conds *Conditions, call *storagepb.RewriteObjectRequest) error {
if gen >= 0 {
call.SourceGeneration = gen
}
if conds == nil {
return nil
}
if err := conds.validate("CopyTo source"); err != nil {
return err
}
switch {
case conds.GenerationMatch != 0:
call.IfSourceGenerationMatch = proto.Int64(conds.GenerationMatch)
case conds.GenerationNotMatch != 0:
call.IfSourceGenerationNotMatch = proto.Int64(conds.GenerationNotMatch)
case conds.DoesNotExist:
call.IfSourceGenerationMatch = proto.Int64(0)
}
switch {
case conds.MetagenerationMatch != 0:
call.IfSourceMetagenerationMatch = proto.Int64(conds.MetagenerationMatch)
case conds.MetagenerationNotMatch != 0:
call.IfSourceMetagenerationNotMatch = proto.Int64(conds.MetagenerationNotMatch)
}
return nil
}

// setConditionField sets a field on a *raw.WhateverCall.
// We can't use anonymous interfaces because the return type is
// different, since the field setters are builders.
Expand Down

0 comments on commit e8d24c1

Please sign in to comment.