Skip to content

Commit 0bb3434

Browse files
fix(storage): fix adding multiple range on stream with same read id (#11584)
Issue: 1. If we get a read add call we put it in map `mr.mp[curentID] = spec` 2. And if at that same point we receive an error from server we go into retry loop and close the stream manager go-routine. 3. During retry we see this new entry in map which is also retried. 4. Basically this creates two instances of curentID which we send to server. One due to add call one due to retry. Fix: 1. Remove adding the current id in map in add call. As that can create duplicates as seen above. 2. Ignore the values from server in case entry is not found in map, given user will be notified if an entry is not in map by the help of callback.
1 parent d14e91c commit 0bb3434

File tree

1 file changed

+15
-5
lines changed

1 file changed

+15
-5
lines changed

storage/grpc_client.go

+15-5
Original file line numberDiff line numberDiff line change
@@ -1206,6 +1206,7 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
12061206
delete(rr.mp, key)
12071207
}
12081208
}
1209+
rr.activeTask = 0
12091210
rr.mu.Unlock()
12101211
return
12111212
case currentSpec = <-rr.data:
@@ -1287,6 +1288,11 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
12871288
for _, val := range arr {
12881289
id := val.GetReadRange().GetReadId()
12891290
rr.mu.Lock()
1291+
_, ok := rr.mp[id]
1292+
if !ok {
1293+
// it's ok to ignore responses for read_id not in map as user would have been notified by callback.
1294+
continue
1295+
}
12901296
_, err = rr.mp[id].writer.Write(val.GetChecksummedData().GetContent())
12911297
if err != nil {
12921298
rr.mp[id].callback(rr.mp[id].offset, rr.mp[id].limit, err)
@@ -1337,6 +1343,8 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
13371343
rr.mp[key].callback(rr.mp[key].offset, rr.mp[key].limit, err)
13381344
delete(rr.mp, key)
13391345
}
1346+
// In case we hit an permanent error, delete entries from map and remove active tasks.
1347+
rr.activeTask = 0
13401348
rr.mu.Unlock()
13411349
rr.close()
13421350
} else {
@@ -1443,11 +1451,10 @@ func (mr *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback fu
14431451
return
14441452
}
14451453
mr.mu.Lock()
1446-
curentID := (*mr).readID
1454+
currentID := (*mr).readID
14471455
(*mr).readID++
14481456
if !mr.done {
1449-
spec := rangeSpec{readID: curentID, writer: output, offset: offset, limit: limit, bytesWritten: 0, callback: callback}
1450-
mr.mp[curentID] = spec
1457+
spec := rangeSpec{readID: currentID, writer: output, offset: offset, limit: limit, bytesWritten: 0, callback: callback}
14511458
mr.activeTask++
14521459
mr.data <- []rangeSpec{spec}
14531460
} else {
@@ -1458,12 +1465,15 @@ func (mr *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback fu
14581465

14591466
func (mr *gRPCBidiReader) wait() {
14601467
mr.mu.Lock()
1461-
keepWaiting := len(mr.mp) != 0 && mr.activeTask != 0
1468+
// we should wait until there is active task or an entry in the map.
1469+
// there can be a scenario we have nothing in map for a moment or too but still have active task.
1470+
// hence in case we have permanent errors we reduce active task to 0 so that this does not block wait.
1471+
keepWaiting := len(mr.mp) != 0 || mr.activeTask != 0
14621472
mr.mu.Unlock()
14631473

14641474
for keepWaiting {
14651475
mr.mu.Lock()
1466-
keepWaiting = len(mr.mp) != 0 && mr.activeTask != 0
1476+
keepWaiting = len(mr.mp) != 0 || mr.activeTask != 0
14671477
mr.mu.Unlock()
14681478
}
14691479
}

0 commit comments

Comments
 (0)