Skip to content

Commit

Permalink
ingest: add long polling to TestConcurrentFetchers (#9971)
Browse files Browse the repository at this point in the history
* ingest: add long polling to TestConcurrentFetchers

Some of the tests assume that we'd fetch all produced records in one go. The Kafka protocol doesn't guarantee that and sometimes kfake also returns less than that. This means that sometimes the tests would finish early before we've fetched all records.

This PR introduced `logPollFetches` which loops over `fetchers.PollFetches` multiple times until a timeout or until we fetch the expected number of records.

I set the timeout to 2s because of the MinBytesMaxWait of 1s in tests ([code](https://github.com/grafana/mimir/blob/96c92c99ef5599d4507c2aa9f6d7ddf42d5beec2/pkg/storage/ingest/fetcher_test.go#L1373)):

* fetcher receives 1 record (1st request)
* fetcher sends the `fetchResult` to `fetchWant.result`
* fetcher receives 2 records (2nd request)
* fetcher can't send the result to `fetchWant.result` because `start()` isn't receiving on the channel; `start()` is trying to send to `orderedFetches`
* fetcher start another (3rd) attempt; the attempt takes 1s (`MaxWaitMillis` in the Fetch request)
* client calls `PollFetches`, reads from orderedFetches; receives the first `Fetches` with 1 records; `start()` goes back to waiting on `fetchWant.result`
* client calls `PollFetches`, `start()` is still waiting on `fetchWant.result`
* fetcher receives the response to 3rd request with no records; sends the buffered 2 records from 2nd request
* client receives 2 records

Another solution to waiting for 2s is to make the merging of results in `start()` for example.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Undo testing logger

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add 2s to remaining places

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Use testing logger

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Don't immediately abort test when there are some buffered records

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Increase timeout

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Nov 21, 2024
1 parent fbebaf8 commit d9c223f
Showing 1 changed file with 77 additions and 101 deletions.
178 changes: 77 additions & 101 deletions pkg/storage/ingest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func TestConcurrentFetchers(t *testing.T) {

fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, 0)

fetches, _ := fetchers.PollFetches(ctx)
fetches := longPollFetches(fetchers, 5, 2*time.Second)
assert.Equal(t, fetches.NumRecords(), 5)

// We expect no more records returned by PollFetches() and no buffered records.
Expand All @@ -375,7 +375,7 @@ func TestConcurrentFetchers(t *testing.T) {
produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i)))
}

fetches, _ := fetchers.PollFetches(ctx)
fetches := longPollFetches(fetchers, 3, 2*time.Second)
assert.Equal(t, fetches.NumRecords(), 3)

// We expect no more records returned by PollFetches() and no buffered records.
Expand Down Expand Up @@ -448,11 +448,8 @@ func TestConcurrentFetchers(t *testing.T) {
}

// Consume all expected records.
consumedRecords := 0
for consumedRecords < 10 {
fetches, _ := fetchers.PollFetches(ctx)
consumedRecords += fetches.NumRecords()
}
fetches := longPollFetches(fetchers, 10, 2*time.Second)
consumedRecords := fetches.NumRecords()
assert.Equal(t, 10, consumedRecords)

// We expect no more records returned by PollFetches() and no buffered records.
Expand Down Expand Up @@ -481,11 +478,8 @@ func TestConcurrentFetchers(t *testing.T) {
produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i)))
}

var totalRecords int
for totalRecords < 20 {
fetches, _ := fetchers.PollFetches(ctx)
totalRecords += fetches.NumRecords()
}
fetches := longPollFetches(fetchers, 20, 2*time.Second)
totalRecords := fetches.NumRecords()

assert.Equal(t, 20, totalRecords)

Expand Down Expand Up @@ -522,12 +516,10 @@ func TestConcurrentFetchers(t *testing.T) {

const expectedRecords = 5
fetchedRecordsContents := make([]string, 0, expectedRecords)
for len(fetchedRecordsContents) < expectedRecords {
fetches, _ := fetchers.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsContents = append(fetchedRecordsContents, string(r.Value))
})
}
fetches := longPollFetches(fetchers, expectedRecords, 2*time.Second)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsContents = append(fetchedRecordsContents, string(r.Value))
})

assert.Equal(t, []string{
"record-4",
Expand Down Expand Up @@ -566,13 +558,11 @@ func TestConcurrentFetchers(t *testing.T) {

// Poll for fetches and verify
fetchedRecords := make([]string, 0, recordsPerRound)
for len(fetchedRecords) < recordsPerRound {
fetches, _ := fetchers.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecords = append(fetchedRecords, string(r.Value))
t.Log("fetched", r.Offset, string(r.Value))
})
}
fetches := longPollFetches(fetchers, recordsPerRound, 2*time.Second)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecords = append(fetchedRecords, string(r.Value))
t.Log("fetched", r.Offset, string(r.Value))
})

// Verify fetched records
assert.Equal(t, expectedRecords, fetchedRecords, "Fetched records in round %d do not match expected", round)
Expand All @@ -599,11 +589,7 @@ func TestConcurrentFetchers(t *testing.T) {
producedOffset := produceRecord(ctx, t, client, topicName, partitionID, record)
// verify that the record is fetched.

var fetches kgo.Fetches
require.Eventually(t, func() bool {
fetches, _ = fetchers.PollFetches(ctx)
return len(fetches.Records()) == 1
}, 5*time.Second, 100*time.Millisecond)
fetches := longPollFetches(fetchers, 1, 5*time.Second)

require.Equal(t, fetches.Records()[0].Value, record)
require.Equal(t, fetches.Records()[0].Offset, producedOffset)
Expand Down Expand Up @@ -667,41 +653,31 @@ func TestConcurrentFetchers(t *testing.T) {
fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, initialConcurrency, 0)

fetchedRecords := make([]*kgo.Record, 0)
fetchedCount := atomic.NewInt64(0)

fetchRecords := func(duration time.Duration) {
deadline := time.Now().Add(duration)
for time.Now().Before(deadline) {
fetches, _ := fetchers.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecords = append(fetchedRecords, r)
fetchedCount.Inc()
})
}
}

// Initial fetch with starting concurrency
fetchRecords(2 * time.Second)
initialFetched := fetchedCount.Load()
fetches := longPollFetches(fetchers, math.MaxInt, 2*time.Second)
initialFetched := fetches.NumRecords()

// Update to higher concurrency
fetchers.Update(ctx, 4)
fetchRecords(3 * time.Second)
highConcurrencyFetched := fetchedCount.Load() - initialFetched
fetches = longPollFetches(fetchers, math.MaxInt, 3*time.Second)
highConcurrencyFetched := fetches.NumRecords()

// Update to lower concurrency
fetchers.Update(ctx, 1)
fetchRecords(3 * time.Second)
fetches = longPollFetches(fetchers, math.MaxInt, 3*time.Second)
lowerConcurrentFetched := fetches.NumRecords()

cancelProduce()
// Produce everything that's left now.
fetchRecords(time.Second)
// Consume everything that's left now.
fetches = longPollFetches(fetchers, math.MaxInt, 2*time.Second)
finalFetched := fetches.NumRecords()
totalProduced := producedCount.Load()
totalFetched := fetchedCount.Load()
totalFetched := initialFetched + highConcurrencyFetched + lowerConcurrentFetched + finalFetched

// Verify fetched records
assert.True(t, totalFetched > 0, "Expected to fetch some records")
assert.Equal(t, totalFetched, totalProduced, "Should not fetch more records than produced")
assert.Equal(t, int64(totalFetched), totalProduced, "Should not fetch more records than produced")
assert.True(t, highConcurrencyFetched > initialFetched, "Expected to fetch more records with higher concurrency")

// Verify record contents
Expand Down Expand Up @@ -754,16 +730,9 @@ func TestConcurrentFetchers(t *testing.T) {
produceRecord(ctx, t, client, topicName, partitionID, []byte(record))
}

fetchedRecords := make([]*kgo.Record, 0, additionalRecords)
fetchDeadline := time.Now().Add(5 * time.Second)

// Fetch records
for len(fetchedRecords) < additionalRecords && time.Now().Before(fetchDeadline) {
fetches, _ := fetchers.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecords = append(fetchedRecords, r)
})
}
fetches := longPollFetches(fetchers, additionalRecords, 5*time.Second)
fetchedRecords := fetches.Records()

// Verify fetched records
assert.LessOrEqual(t, len(fetchedRecords), additionalRecords,
Expand Down Expand Up @@ -813,13 +782,11 @@ func TestConcurrentFetchers(t *testing.T) {

// Expect that we've received all records.
var fetchedRecordsBytes [][]byte
for len(fetchedRecordsBytes) < initiallyProducedRecords {
fetches, _ := fetchers.PollFetches(ctx)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})
}
fetches := longPollFetches(fetchers, initiallyProducedRecords, 2*time.Second)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})

// Produce a few more records
const additionalRecords = 3
Expand All @@ -830,13 +797,11 @@ func TestConcurrentFetchers(t *testing.T) {
}

// Fetchers shouldn't be stalled and should continue fetching as the HWM moves forward.
for len(fetchedRecordsBytes) < initiallyProducedRecords+additionalRecords {
fetches, _ := fetchers.PollFetches(ctx)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})
}
fetches = longPollFetches(fetchers, additionalRecords, 2*time.Second)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})

assert.Equal(t, producedRecordsBytes, fetchedRecordsBytes)

Expand Down Expand Up @@ -878,13 +843,11 @@ func TestConcurrentFetchers(t *testing.T) {

// Fetch and verify records; this should unblock the fetchers.
var fetchedRecordsBytes [][]byte
for len(fetchedRecordsBytes) < initialRecords {
fetches, _ := fetchers.PollFetches(ctx)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})
}
fetches := longPollFetches(fetchers, initialRecords, 2*time.Second) // Ensure no more records are fetched.
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})

// Set up control function to monitor fetch requests
var checkRequestOffset func(req kmsg.Request) (kmsg.Response, error, bool)
Expand Down Expand Up @@ -1057,10 +1020,9 @@ func TestConcurrentFetchers(t *testing.T) {
assert.LessOrEqualf(t, fetchers.BufferedRecords(), int64(maxInflightBytes), "Should still not buffer more than %d bytes after consuming some records", maxInflightBytes)

// Consume all remaining records and verify total
for totalConsumedRecords < totalProducedRecords {
fetches, _ = fetchers.PollFetches(ctx)
totalConsumedRecords += fetches.NumRecords()
}
// We produce a lot of data, give enough time so that the slow CI doesn't flake
fetches = longPollFetches(fetchers, totalProducedRecords-totalConsumedRecords, 20*time.Second)
totalConsumedRecords += fetches.NumRecords()

// Allow time for more fetches
waitForStableBufferedRecords(t, fetchers)
Expand All @@ -1085,6 +1047,8 @@ func TestConcurrentFetchers(t *testing.T) {
smallRecordSize = 1000
)

require.True(t, smallRecordsCount%2 == 0, "we divide the smallRecordsCount by 2 later on, it must be divisible by 2")

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand All @@ -1107,11 +1071,8 @@ func TestConcurrentFetchers(t *testing.T) {

assert.LessOrEqualf(t, fetchers.BufferedBytes(), int64(maxInflightBytes), "Should not buffer more than %d bytes of large records", maxInflightBytes)
// Consume all large records
consumedRecords := 0
for consumedRecords < largeRecordsCount {
fetches, _ := fetchers.PollFetches(ctx)
consumedRecords += fetches.NumRecords()
}
fetches := longPollFetches(fetchers, largeRecordsCount, 10*time.Second)
consumedRecords := fetches.NumRecords()

pollFetchesAndAssertNoRecords(t, fetchers)
t.Log("Consumed all large records")
Expand All @@ -1125,10 +1086,8 @@ func TestConcurrentFetchers(t *testing.T) {
t.Logf("Produced %d small records", smallRecordsCount)

// Consume half of the small records. This should be enough to stabilize the records size estimation.
for consumedRecords < largeRecordsCount+smallRecordsCount/2 {
fetches, _ := fetchers.PollFetches(ctx)
consumedRecords += fetches.NumRecords()
}
fetches = longPollFetches(fetchers, smallRecordsCount/2, 10*time.Second)
consumedRecords += fetches.NumRecords()
t.Log("Consumed half of the small records")

// Assert that the buffer is well utilized.
Expand All @@ -1139,14 +1098,13 @@ func TestConcurrentFetchers(t *testing.T) {
assert.GreaterOrEqual(t, fetchers.BufferedBytes(), int64(maxInflightBytes/2), "Should still buffer a decent number of records")

// Consume the rest of the small records.
const totalProducedRecords = largeRecordsCount + smallRecordsCount
for consumedRecords < totalProducedRecords {
fetches, _ := fetchers.PollFetches(ctx)
consumedRecords += fetches.NumRecords()
}
// Consume half of the small records. This should be enough to stabilize the records size estimation.
fetches = longPollFetches(fetchers, smallRecordsCount/2, 10*time.Second)
consumedRecords += fetches.NumRecords()
t.Log("Consumed rest of the small records")

// Verify we received correct number of records
const totalProducedRecords = largeRecordsCount + smallRecordsCount
assert.Equal(t, totalProducedRecords, consumedRecords, "Should have consumed all records")

// Verify no more records are buffered. First wait for the buffered records to stabilize.
Expand Down Expand Up @@ -1374,7 +1332,8 @@ func TestFetchResult_Merge(t *testing.T) {
}

func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Client, topic string, partition int32, startOffset int64, concurrency int, maxInflightBytes int32) *concurrentFetchers {
logger := log.NewNopLogger()
logger := testingLogger.WithT(t)

reg := prometheus.NewPedanticRegistry()
metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{})

Expand Down Expand Up @@ -1419,6 +1378,23 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli
return f
}

// longPollFetches polls fetches until the timeout is reached or the number of records is at least minRecords.
func longPollFetches(fetchers *concurrentFetchers, minRecords int, timeout time.Duration) kgo.Fetches {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

allFetches := make(kgo.Fetches, 0)
for ctx.Err() == nil && allFetches.NumRecords() < minRecords {
fetches, _ := fetchers.PollFetches(ctx)
if fetches.Err() != nil {
continue
}
allFetches = append(allFetches, fetches...)
}

return allFetches
}

// pollFetchesAndAssertNoRecords ensures that PollFetches() returns 0 records and there are
// no buffered records in fetchers. Since some records are discarded in the PollFetches(),
// we may have to call it multiple times to process all buffered records that need to be
Expand All @@ -1439,7 +1415,7 @@ func pollFetchesAndAssertNoRecords(t *testing.T, fetchers *concurrentFetchers) {
}

// We always expect that PollFetches() returns zero records.
require.Len(t, fetches.Records(), 0)
assert.Len(t, fetches.Records(), 0)

// If there are no buffered records, we're good. We can end the assertion.
if fetchers.BufferedRecords() == 0 {
Expand Down

0 comments on commit d9c223f

Please sign in to comment.