Skip to content

Commit

Permalink
chore(storage): fix checksums for gRPC uploads
Browse files Browse the repository at this point in the history
There was an API change so that checksums can now only
be provided by the StartResumableUpload request rather than
while uploading. Send the checksum at this stage instead.
  • Loading branch information
tritone committed Dec 27, 2022
1 parent ca3c0b3 commit 7b848ac
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 28 deletions.
43 changes: 22 additions & 21 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1496,11 +1496,29 @@ func (w *gRPCWriter) startResumableUpload() error {
if err != nil {
return err
}
req := &storagepb.StartResumableWriteRequest{
WriteObjectSpec: spec,
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
}
// TODO: Currently the checksums are only sent on the first message
// of the stream, but in the future, we must also support sending it
// on the *last* message of the stream (instead of the first).
if w.sendCRC32C {
req.ObjectChecksums = &storagepb.ObjectChecksums{
Crc32C: proto.Uint32(w.attrs.CRC32C),
}
}
if len(w.attrs.MD5) != 0 {
if cs := req.GetObjectChecksums(); cs == nil {
req.ObjectChecksums = &storagepb.ObjectChecksums{
Md5Hash: w.attrs.MD5,
}
} else {
cs.Md5Hash = w.attrs.MD5
}
}
return run(w.ctx, func() error {
upres, err := w.c.raw.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{
WriteObjectSpec: spec,
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
})
upres, err := w.c.raw.StartResumableWrite(w.ctx, req)
w.upid = upres.GetUploadId()
return err
}, w.settings.retry, w.settings.idempotent, setRetryHeaderGRPC(w.ctx))
Expand Down Expand Up @@ -1587,23 +1605,6 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(w.encryptionKey)
}

// TODO: Currently the checksums are only sent on the first message
// of the stream, but in the future, we must also support sending it
// on the *last* message of the stream (instead of the first).
if w.sendCRC32C {
req.ObjectChecksums = &storagepb.ObjectChecksums{
Crc32C: proto.Uint32(w.attrs.CRC32C),
}
}
if len(w.attrs.MD5) != 0 {
if cs := req.GetObjectChecksums(); cs == nil {
req.ObjectChecksums = &storagepb.ObjectChecksums{
Md5Hash: w.attrs.MD5,
}
} else {
cs.Md5Hash = w.attrs.MD5
}
}
}

err = w.stream.Send(req)
Expand Down
11 changes: 4 additions & 7 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ func initTransportClients(ctx context.Context, t *testing.T, opts ...option.Clie
// of an existing bucket to use, a bucket name to use for bucket creation, and
// the client to use.
func multiTransportTest(ctx context.Context, t *testing.T,
test func(*testing.T, context.Context, string, string, *Client),
opts ...option.ClientOption) {
test func(*testing.T, context.Context, string, string, *Client),
opts ...option.ClientOption) {
for transport, client := range initTransportClients(ctx, t, opts...) {
t.Run(transport, func(t *testing.T) {
defer client.Close()
Expand Down Expand Up @@ -1082,9 +1082,6 @@ func TestIntegration_MultiMessageWriteGRPC(t *testing.T) {

func TestIntegration_MultiChunkWrite(t *testing.T) {
multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
if bucket == grpcBucketName {
t.Skip("https://github.com/googleapis/google-cloud-go/issues/7033")
}
h := testHelper{t}
obj := client.Bucket(bucket).Object(uidSpace.New()).Retryer(WithPolicy(RetryAlways))
defer h.mustDeleteObject(obj)
Expand Down Expand Up @@ -5150,8 +5147,8 @@ func retryOnTransient400and403(err error) bool {
var e *googleapi.Error
var ae *apierror.APIError
return ShouldRetry(err) ||
/* http */ errors.As(err, &e) && (e.Code == 400 || e.Code == 403) ||
/* grpc */ errors.As(err, &ae) && (ae.GRPCStatus().Code() == codes.InvalidArgument || ae.GRPCStatus().Code() == codes.PermissionDenied)
/* http */ errors.As(err, &e) && (e.Code == 400 || e.Code == 403) ||
/* grpc */ errors.As(err, &ae) && (ae.GRPCStatus().Code() == codes.InvalidArgument || ae.GRPCStatus().Code() == codes.PermissionDenied)
}

func skipGRPC(reason string) context.Context {
Expand Down

0 comments on commit 7b848ac

Please sign in to comment.