Skip to content

Commit 96dbb6c

Browse files
authored
fix(storage/append): Report progress for appends. (#11503)
The appendable writer should report progress, as the resumable writer does.
1 parent 90edd74 commit 96dbb6c

File tree

2 files changed

+26
-5
lines changed

2 files changed

+26
-5
lines changed

storage/client_test.go

+10
Original file line numberDiff line numberDiff line change
@@ -1000,13 +1000,23 @@ func TestOpenAppendableWriterMultipleFlushesEmulated(t *testing.T) {
10001000
w.Append = true
10011001
// This should chunk the request into three separate flushes to storage.
10021002
w.ChunkSize = MiB
1003+
var lastReportedOffset int64
1004+
w.ProgressFunc = func(offset int64) {
1005+
if offset != lastReportedOffset+MiB {
1006+
t.Errorf("incorrect progress report: got %d; want %d", offset, lastReportedOffset+MiB)
1007+
}
1008+
lastReportedOffset = offset
1009+
}
10031010
_, err = w.Write(randomBytes3MiB)
10041011
if err != nil {
10051012
t.Fatalf("writing test data: got %v; want ok", err)
10061013
}
10071014
if err := w.Close(); err != nil {
10081015
t.Fatalf("closing test data writer: got %v; want ok", err)
10091016
}
1017+
if lastReportedOffset != 3*MiB {
1018+
t.Errorf("incorrect final progress report: got %d; want %d", lastReportedOffset, 3*MiB)
1019+
}
10101020

10111021
if diff := cmp.Diff(w.Attrs().Name, objName); diff != "" {
10121022
t.Fatalf("Resulting object name: got(-), want(+):\n%s", diff)

storage/grpc_writer.go

+16-5
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type gRPCAppendBidiWriteBufferSender struct {
3939
objectChecksums *storagepb.ObjectChecksums
4040

4141
forceFirstMessage bool
42+
progress func(int64)
4243
flushOffset int64
4344

4445
// Fields used to report responses from the receive side of the stream
@@ -62,6 +63,7 @@ func (w *gRPCWriter) newGRPCAppendBidiWriteBufferSender() (*gRPCAppendBidiWriteB
6263
},
6364
objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
6465
forceFirstMessage: true,
66+
progress: w.progress,
6567
}
6668
return s, nil
6769
}
@@ -246,26 +248,35 @@ func (s *gRPCAppendBidiWriteBufferSender) sendOnConnectedStream(buf []byte, offs
246248
if s.recvErr != io.EOF {
247249
return nil, s.recvErr
248250
}
251+
if obj.GetSize() > s.flushOffset {
252+
s.flushOffset = obj.GetSize()
253+
s.progress(s.flushOffset)
254+
}
249255
return
250256
}
251257

252258
if flush {
253259
// We don't necessarily expect multiple responses for a single flush, but
254260
// this allows the server to send multiple responses if it wants to.
255-
for s.flushOffset < offset+int64(len(buf)) {
261+
flushOffset := s.flushOffset
262+
for flushOffset < offset+int64(len(buf)) {
256263
resp, ok := <-s.recvs
257264
if !ok {
258265
return nil, s.recvErr
259266
}
260267
pSize := resp.GetPersistedSize()
261268
rSize := resp.GetResource().GetSize()
262-
if s.flushOffset < pSize {
263-
s.flushOffset = pSize
269+
if flushOffset < pSize {
270+
flushOffset = pSize
264271
}
265-
if s.flushOffset < rSize {
266-
s.flushOffset = rSize
272+
if flushOffset < rSize {
273+
flushOffset = rSize
267274
}
268275
}
276+
if s.flushOffset < flushOffset {
277+
s.flushOffset = flushOffset
278+
s.progress(s.flushOffset)
279+
}
269280
}
270281

271282
return

0 commit comments

Comments
 (0)