Skip to content

Commit 9ccca70

Browse files
committed
fix additional race and unblock tests
1 parent cccafae commit 9ccca70

File tree

4 files changed

+10
-15
lines changed

4 files changed

+10
-15
lines changed

storage/dataflux/integration_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ func TestMain(m *testing.M) {
7070

7171
// Lists the all the objects in the bucket.
7272
func TestIntegration_NextBatch_All(t *testing.T) {
73-
t.Skip("#11198")
7473
if testing.Short() {
7574
t.Skip("Integration tests skipped in short mode")
7675
}
@@ -97,7 +96,6 @@ func TestIntegration_NextBatch_All(t *testing.T) {
9796
}
9897

9998
func TestIntegration_NextBatch(t *testing.T) {
100-
t.Skip("#11196")
10199
// Accessing public bucket to list large number of files in batches.
102100
// See https://cloud.google.com/storage/docs/public-datasets/landsat
103101
if testing.Short() {

storage/dataflux/range_splitter.go

+4
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,8 @@ func (rs *rangeSplitter) convertStringRangeToMinimalIntRange(
328328

329329
// charPosition returns the index of the character in the alphabet set.
330330
func (rs *rangeSplitter) charPosition(ch rune) (int, error) {
331+
rs.mu.Lock() // Acquire the lock
332+
defer rs.mu.Unlock() // Release the lock when the function exits
331333
if idx, ok := rs.alphabetMap[ch]; ok {
332334
return idx, nil
333335
}
@@ -337,6 +339,8 @@ func (rs *rangeSplitter) charPosition(ch rune) (int, error) {
337339
// convertRangeStringToArray transforms the range string into a rune slice while
338340
// verifying the presence of each character in the alphabets.
339341
func (rs *rangeSplitter) convertRangeStringToArray(rangeString string) ([]rune, error) {
342+
rs.mu.Lock() // Acquire the lock
343+
defer rs.mu.Unlock() // Release the lock when the function exits
340344
for _, char := range rangeString {
341345
if _, exists := rs.alphabetMap[char]; !exists {
342346
return nil, fmt.Errorf("character %c in range string %q is not found in the alphabet array", char, rangeString)

storage/dataflux/worksteal.go

+6-12
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ func (c *Lister) workstealListing(ctx context.Context) ([]*storage.ObjectAttrs,
7777
if err != nil {
7878
return nil, fmt.Errorf("creating new range splitter: %w", err)
7979
}
80-
8180
g, ctx := errgroup.WithContext(ctx)
8281
// Initialize all workers as idle.
8382
for i := 0; i < c.parallelism; i++ {
@@ -126,17 +125,12 @@ func (w *worker) doWorkstealListing(ctx context.Context) error {
126125
// If a worker is idle, sleep for a while before checking the next update.
127126
// Worker status is changed to active when it finds work in range channel.
128127
if w.status == idle {
129-
if len(w.lister.ranges) == 0 {
130-
time.Sleep(sleepDurationWhenIdle)
128+
select {
129+
case newRange := <-w.lister.ranges:
130+
<-w.idleChannel
131+
w.updateWorker(newRange.startRange, newRange.endRange, active)
132+
case <-time.After(sleepDurationWhenIdle):
131133
continue
132-
} else {
133-
select {
134-
case newRange := <-w.lister.ranges:
135-
<-w.idleChannel
136-
w.updateWorker(newRange.startRange, newRange.endRange, active)
137-
case <-time.After(sleepDurationWhenIdle):
138-
continue
139-
}
140134
}
141135
}
142136
// Active worker to list next page of objects within the range
@@ -157,7 +151,7 @@ func (w *worker) doWorkstealListing(ctx context.Context) error {
157151

158152
// If listing not complete and idle workers are available, split the range
159153
// and give half of work to idle worker.
160-
for len(w.idleChannel)-len(w.lister.ranges) > 0 && ctx.Err() == nil {
154+
if len(w.idleChannel)-len(w.lister.ranges) > 0 && ctx.Err() == nil {
161155
// Split range and upload half of work for idle worker.
162156
splitPoint, err := w.rangesplitter.splitRange(w.startRange, w.endRange, 1)
163157
if err != nil {

storage/dataflux/worksteal_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
)
2323

2424
func TestWorkstealListingEmulated(t *testing.T) {
25-
t.Skip("https://github.com/googleapis/google-cloud-go/issues/11205")
2625
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client *storage.Client) {
2726

2827
attrs := &storage.BucketAttrs{

0 commit comments

Comments
 (0)