Skip to content

Commit 9e359f0

Browse files
fix(storage): modify the callback of mrd to return length of data read instead of limit. (#11687)
Currently MRD takes a callback in each Add call. Current call back returns the offset, length and error on that add call making the callback function look like, func(int64,int64,error) . In some scenario data read can be different from limit set. For example: We just have 50 bytes to read and ask for 100 bytes(length/limit) in this scenario we will return just 50 bytes. Hence changed the callback to return amount of data read instead of data limit set. Added the go doc for callback.
1 parent d34c3b9 commit 9e359f0

File tree

3 files changed

+82
-26
lines changed

3 files changed

+82
-26
lines changed

storage/grpc_client.go

+29-26
Original file line numberDiff line numberDiff line change
@@ -1062,12 +1062,13 @@ func contextMetadataFromBidiReadObject(req *storagepb.BidiReadObjectRequest) []s
10621062
}
10631063

10641064
type rangeSpec struct {
1065-
readID int64
1066-
writer io.Writer
1067-
offset int64
1068-
limit int64
1069-
bytesWritten int64
1070-
callback func(int64, int64, error)
1065+
readID int64
1066+
writer io.Writer
1067+
offset int64
1068+
limit int64
1069+
currentBytesWritten int64
1070+
totalBytesWritten int64
1071+
callback func(int64, int64, error)
10711072
}
10721073

10731074
func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params *newMultiRangeDownloaderParams, opts ...storageOption) (mr *MultiRangeDownloader, err error) {
@@ -1202,7 +1203,7 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
12021203
rr.mu.Lock()
12031204
if len(rr.mp) != 0 {
12041205
for key := range rr.mp {
1205-
rr.mp[key].callback(rr.mp[key].offset, rr.mp[key].limit, fmt.Errorf("stream closed early"))
1206+
rr.mp[key].callback(rr.mp[key].offset, rr.mp[key].totalBytesWritten, fmt.Errorf("stream closed early"))
12061207
delete(rr.mp, key)
12071208
}
12081209
}
@@ -1295,21 +1296,22 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
12951296
}
12961297
_, err = rr.mp[id].writer.Write(val.GetChecksummedData().GetContent())
12971298
if err != nil {
1298-
rr.mp[id].callback(rr.mp[id].offset, rr.mp[id].limit, err)
1299+
rr.mp[id].callback(rr.mp[id].offset, rr.mp[id].totalBytesWritten, err)
12991300
rr.activeTask--
13001301
delete(rr.mp, id)
13011302
} else {
13021303
rr.mp[id] = rangeSpec{
1303-
readID: rr.mp[id].readID,
1304-
writer: rr.mp[id].writer,
1305-
offset: rr.mp[id].offset,
1306-
limit: rr.mp[id].limit,
1307-
bytesWritten: rr.mp[id].bytesWritten + int64(len(val.GetChecksummedData().GetContent())),
1308-
callback: rr.mp[id].callback,
1304+
readID: rr.mp[id].readID,
1305+
writer: rr.mp[id].writer,
1306+
offset: rr.mp[id].offset,
1307+
limit: rr.mp[id].limit,
1308+
currentBytesWritten: rr.mp[id].currentBytesWritten + int64(len(val.GetChecksummedData().GetContent())),
1309+
totalBytesWritten: rr.mp[id].totalBytesWritten + int64(len(val.GetChecksummedData().GetContent())),
1310+
callback: rr.mp[id].callback,
13091311
}
13101312
}
13111313
if val.GetRangeEnd() {
1312-
rr.mp[id].callback(rr.mp[id].offset, rr.mp[id].limit, nil)
1314+
rr.mp[id].callback(rr.mp[id].offset, rr.mp[id].totalBytesWritten, nil)
13131315
rr.activeTask--
13141316
delete(rr.mp, id)
13151317
}
@@ -1340,7 +1342,7 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
13401342
if err != nil {
13411343
rr.mu.Lock()
13421344
for key := range rr.mp {
1343-
rr.mp[key].callback(rr.mp[key].offset, rr.mp[key].limit, err)
1345+
rr.mp[key].callback(rr.mp[key].offset, rr.mp[key].totalBytesWritten, err)
13441346
delete(rr.mp, key)
13451347
}
13461348
// In case we hit an permanent error, delete entries from map and remove active tasks.
@@ -1388,12 +1390,13 @@ func getActiveRange(r *gRPCBidiReader) []rangeSpec {
13881390
var activeRange []rangeSpec
13891391
for k, v := range r.mp {
13901392
activeRange = append(activeRange, rangeSpec{
1391-
readID: k,
1392-
writer: v.writer,
1393-
offset: (v.offset + v.bytesWritten),
1394-
limit: v.limit - v.bytesWritten,
1395-
callback: v.callback,
1396-
bytesWritten: 0,
1393+
readID: k,
1394+
writer: v.writer,
1395+
offset: (v.offset + v.currentBytesWritten),
1396+
limit: v.limit - v.currentBytesWritten,
1397+
callback: v.callback,
1398+
currentBytesWritten: 0,
1399+
totalBytesWritten: v.totalBytesWritten,
13971400
})
13981401
r.mp[k] = activeRange[len(activeRange)-1]
13991402
}
@@ -1443,22 +1446,22 @@ func (mr *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback fu
14431446
mr.mu.Unlock()
14441447

14451448
if offset > objectSize {
1446-
callback(offset, limit, fmt.Errorf("offset larger than size of object: %v", objectSize))
1449+
callback(offset, 0, fmt.Errorf("offset larger than size of object: %v", objectSize))
14471450
return
14481451
}
14491452
if limit < 0 {
1450-
callback(offset, limit, fmt.Errorf("limit can't be negative"))
1453+
callback(offset, 0, fmt.Errorf("limit can't be negative"))
14511454
return
14521455
}
14531456
mr.mu.Lock()
14541457
currentID := (*mr).readID
14551458
(*mr).readID++
14561459
if !mr.done {
1457-
spec := rangeSpec{readID: currentID, writer: output, offset: offset, limit: limit, bytesWritten: 0, callback: callback}
1460+
spec := rangeSpec{readID: currentID, writer: output, offset: offset, limit: limit, currentBytesWritten: 0, totalBytesWritten: 0, callback: callback}
14581461
mr.activeTask++
14591462
mr.data <- []rangeSpec{spec}
14601463
} else {
1461-
callback(offset, limit, fmt.Errorf("stream is closed, can't add range"))
1464+
callback(offset, 0, fmt.Errorf("stream is closed, can't add range"))
14621465
}
14631466
mr.mu.Unlock()
14641467
}

storage/integration_test.go

+49
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,55 @@ func TestIntegration_MultiRangeDownloader(t *testing.T) {
402402
})
403403
}
404404

405+
// TestIntegration_MRDCallbackReturnsDataLength tests if the callback returns the correct data
406+
// read length or not.
407+
func TestIntegration_MRDCallbackReturnsDataLength(t *testing.T) {
408+
multiTransportTest(skipHTTP("gRPC implementation specific test"), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
409+
content := make([]byte, 1000)
410+
rand.New(rand.NewSource(0)).Read(content)
411+
objName := "MRDCallback"
412+
413+
// Upload test data.
414+
obj := client.Bucket(bucket).Object(objName)
415+
if err := writeObject(ctx, obj, "text/plain", content); err != nil {
416+
t.Fatal(err)
417+
}
418+
defer func() {
419+
if err := obj.Delete(ctx); err != nil {
420+
log.Printf("failed to delete test object: %v", err)
421+
}
422+
}()
423+
reader, err := obj.NewMultiRangeDownloader(ctx)
424+
if err != nil {
425+
t.Fatalf("NewMultiRangeDownloader: %v", err)
426+
}
427+
var res multiRangeDownloaderOutput
428+
callback := func(x, y int64, err error) {
429+
res.offset = x
430+
res.limit = y
431+
res.err = err
432+
}
433+
// Read All At Once.
434+
offset := 0
435+
limit := 10000
436+
reader.Add(&res.buf, int64(offset), int64(limit), callback)
437+
reader.Wait()
438+
if res.limit != 1000 {
439+
t.Errorf("Error in callback want data length 1000, got: %v", res.limit)
440+
}
441+
if !bytes.Equal(res.buf.Bytes(), content) {
442+
t.Errorf("Error in read range offset %v, limit %v, got: %v; want: %v",
443+
offset, limit, res.buf.Bytes(), content)
444+
}
445+
if res.err != nil {
446+
t.Errorf("read range %v to %v : %v", res.offset, 10000, res.err)
447+
}
448+
if err = reader.Close(); err != nil {
449+
t.Fatalf("Error while closing reader %v", err)
450+
}
451+
})
452+
}
453+
405454
// TestIntegration_ReadSameFileConcurrentlyUsingMultiRangeDownloader tests for potential deadlocks
406455
// or race conditions when multiple goroutines call Add() concurrently on the same MRD multiple times.
407456
func TestIntegration_ReadSameFileConcurrentlyUsingMultiRangeDownloader(t *testing.T) {

storage/reader.go

+4
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,10 @@ type multiRangeDownloader interface {
411411
// This will initiate the read range but is non-blocking; call callback to
412412
// process the result. Add is thread-safe and can be called simultaneously
413413
// from different goroutines.
414+
//
415+
// Callback will be called with the offset, length of data read, and error
416+
// of the read. Note that the length of the data read may be less than the
417+
// requested length if the end of the object is reached.
414418
func (mrd *MultiRangeDownloader) Add(output io.Writer, offset, length int64, callback func(int64, int64, error)) {
415419
mrd.reader.add(output, offset, length, callback)
416420
}

0 commit comments

Comments
 (0)