Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure ordering of offset commit requests #2941

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type offsetManager struct {
poms map[string]map[int32]*partitionOffsetManager
pomsLock sync.RWMutex

commitLock sync.Mutex

closeOnce sync.Once
closing chan none
closed chan none
Expand Down Expand Up @@ -251,17 +253,19 @@ func (om *offsetManager) Commit() {
}

func (om *offsetManager) flushToBroker() {
req := om.constructRequest()
if req == nil {
return
}

broker, err := om.coordinator()
if err != nil {
om.handleError(err)
return
}

om.commitLock.Lock()
defer om.commitLock.Unlock()
req := om.constructRequest()
if req == nil {
return
}

resp, err := broker.CommitOffset(req)
if err != nil {
om.handleError(err)
Expand Down
104 changes: 104 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sarama
import (
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -82,6 +84,9 @@ func TestNewOffsetManager(t *testing.T) {
metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse)
findCoordResponse := new(FindCoordinatorResponse)
findCoordResponse.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()}
seedBroker.Returns(findCoordResponse)
defer seedBroker.Close()

testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
Expand All @@ -102,6 +107,105 @@ func TestNewOffsetManager(t *testing.T) {
}
}

// Test that the correct sequence of offset commit messages is sent to a broker when
// multiple goroutines for a group are committing offsets at the same time
func TestOffsetManagerCommitSequence(t *testing.T) {
lastOffset := map[int32]int64{}
var outOfOrder atomic.Pointer[string]
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
seedBroker.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"MetadataRequest": func(req *request) encoderWithHeader {
resp := new(MetadataResponse)
resp.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
return resp
},
"FindCoordinatorRequest": func(req *request) encoderWithHeader {
resp := new(FindCoordinatorResponse)
resp.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()}
return resp
},
"OffsetFetchRequest": func(r *request) encoderWithHeader {
req := r.body.(*OffsetFetchRequest)
resp := new(OffsetFetchResponse)
resp.Blocks = map[string]map[int32]*OffsetFetchResponseBlock{}
for topic, partitions := range req.partitions {
for _, partition := range partitions {
if _, ok := resp.Blocks[topic]; !ok {
resp.Blocks[topic] = map[int32]*OffsetFetchResponseBlock{}
}
resp.Blocks[topic][partition] = &OffsetFetchResponseBlock{
Offset: 0,
Err: ErrNoError,
}
}
}
return resp
},
"OffsetCommitRequest": func(r *request) encoderWithHeader {
req := r.body.(*OffsetCommitRequest)
if outOfOrder.Load() == nil {
for partition, offset := range req.blocks["topic"] {
last := lastOffset[partition]
if last > offset.offset {
msg := fmt.Sprintf("out of order commit to partition %d, current committed offset: %d, offset in request: %d",
partition, last, offset.offset)
outOfOrder.Store(&msg)
}
lastOffset[partition] = offset.offset
}
}

// Potentially yield, to try and avoid each Go routine running sequentially to completion
runtime.Gosched()

resp := new(OffsetCommitResponse)
resp.Errors = map[string]map[int32]KError{}
resp.Errors["topic"] = map[int32]KError{}
for partition := range req.blocks["topic"] {
resp.Errors["topic"][partition] = ErrNoError
}
return resp
},
})
testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}
defer safeClose(t, testClient)
om, err := NewOffsetManagerFromClient("group", testClient)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, om)

const numPartitions = 10
const commitsPerPartition = 1000

var wg sync.WaitGroup
for p := 0; p < numPartitions; p++ {
pom, err := om.ManagePartition("topic", int32(p))
if err != nil {
t.Fatal(err)
}

wg.Add(1)
go func() {
for c := 0; c < commitsPerPartition; c++ {
pom.MarkOffset(int64(c+1), "")
om.Commit()
}
wg.Done()
}()
}

wg.Wait()
errMsg := outOfOrder.Load()
if errMsg != nil {
t.Error(*errMsg)
}
}

var offsetsautocommitTestTable = []struct {
name string
set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
Expand Down
Loading