Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(test): use modern protocol versions in FVT #2581

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion functional_consumer_follower_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func TestConsumerFetchFollowerFailover(t *testing.T) {
newConfig := func() *Config {
config := NewFunctionalTestConfig()
config.ClientID = t.Name()
config.Version = V2_8_0_0
config.Producer.Return.Successes = true
return config
}
Expand Down
5 changes: 0 additions & 5 deletions functional_consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func TestFuncConsumerGroupRebalanceAfterAddingPartitions(t *testing.T) {
defer teardownFunctionalTest(t)

config := NewFunctionalTestConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -243,10 +242,7 @@ func TestFuncConsumerGroupOffsetDeletion(t *testing.T) {
checkKafkaVersion(t, "2.4.0")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)
// create a client with 2.4.0 version as it is the minimal version
// that supports DeleteOffsets request
config := NewFunctionalTestConfig()
config.Version = V2_4_0_0
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
defer safeClose(t, client)
if err != nil {
Expand Down Expand Up @@ -401,7 +397,6 @@ type testFuncConsumerGroupMember struct {
func defaultConfig(clientID string) *Config {
config := NewFunctionalTestConfig()
config.ClientID = clientID
config.Version = V0_10_2_0
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = OffsetOldest
config.Consumer.Group.Rebalance.Timeout = 10 * time.Second
Expand Down
7 changes: 0 additions & 7 deletions functional_consumer_staticmembership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ func TestFuncConsumerGroupStaticMembership_Basic(t *testing.T) {

config1 := NewFunctionalTestConfig()
config1.ClientID = "M1"
config1.Version = V2_3_0_0
config1.Consumer.Offsets.Initial = OffsetNewest
config1.Consumer.Group.InstanceId = "Instance1"
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, 100, nil, "test.4")
defer m1.Close()

config2 := NewFunctionalTestConfig()
config2.ClientID = "M2"
config2.Version = V2_3_0_0
config2.Consumer.Offsets.Initial = OffsetNewest
config2.Consumer.Group.InstanceId = "Instance2"
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, 100, nil, "test.4")
Expand Down Expand Up @@ -76,15 +74,13 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) {

config1 := NewFunctionalTestConfig()
config1.ClientID = "M1"
config1.Version = V2_4_0_0
config1.Consumer.Offsets.Initial = OffsetNewest
config1.Consumer.Group.InstanceId = "Instance1"
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, math.MaxInt32, nil, "test.4")
defer m1.Close()

config2 := NewFunctionalTestConfig()
config2.ClientID = "M2"
config2.Version = V2_4_0_0
config2.Consumer.Offsets.Initial = OffsetNewest
config2.Consumer.Group.InstanceId = "Instance2"
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, math.MaxInt32, nil, "test.4")
Expand Down Expand Up @@ -179,15 +175,13 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) {

config1 := NewFunctionalTestConfig()
config1.ClientID = "M1"
config1.Version = V2_3_0_0
config1.Consumer.Offsets.Initial = OffsetNewest
config1.Consumer.Group.InstanceId = "Instance1"
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, math.MaxInt32, nil, "test.4")
defer m1.Close()

config2 := NewFunctionalTestConfig()
config2.ClientID = "M2"
config2.Version = V2_3_0_0
config2.Consumer.Offsets.Initial = OffsetNewest
config2.Consumer.Group.InstanceId = "Instance2"
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, math.MaxInt32, nil, "test.4")
Expand All @@ -198,7 +192,6 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) {

config3 := NewFunctionalTestConfig()
config3.ClientID = "M3"
config3.Version = V2_3_0_0
config3.Consumer.Offsets.Initial = OffsetNewest
config3.Consumer.Group.InstanceId = "Instance2" // same instance id as config2

Expand Down
1 change: 0 additions & 1 deletion functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
config.Producer.Idempotent = true
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = WaitForAll
config.Version = V0_11_0_0

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
Expand Down
21 changes: 8 additions & 13 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ const TestBatchSize = 1000

func TestFuncProducing(t *testing.T) {
config := NewFunctionalTestConfig()
// FIXME: KAFKA_VERSION seems to break this test
config.Version = MinVersion
testProducingMessages(t, config)
}

func TestFuncProducingGzip(t *testing.T) {
config := NewFunctionalTestConfig()
// FIXME: KAFKA_VERSION seems to break this test
config.Version = MinVersion
config.Producer.Compression = CompressionGZIP
testProducingMessages(t, config)
}
Expand All @@ -40,19 +44,22 @@ func TestFuncProducingSnappy(t *testing.T) {

func TestFuncProducingZstd(t *testing.T) {
config := NewFunctionalTestConfig()
config.Version = V2_1_0_0
config.Producer.Compression = CompressionZSTD
testProducingMessages(t, config)
}

func TestFuncProducingNoResponse(t *testing.T) {
config := NewFunctionalTestConfig()
// FIXME: KAFKA_VERSION seems to break this test
config.Version = MinVersion
config.Producer.RequiredAcks = NoResponse
testProducingMessages(t, config)
}

func TestFuncProducingFlushing(t *testing.T) {
config := NewFunctionalTestConfig()
// FIXME: KAFKA_VERSION seems to break this test
config.Version = MinVersion
config.Producer.Flush.Messages = TestBatchSize / 8
config.Producer.Flush.Frequency = 250 * time.Millisecond
testProducingMessages(t, config)
Expand Down Expand Up @@ -108,7 +115,6 @@ func TestFuncTxnProduceNoBegin(t *testing.T) {
config.Producer.Return.Errors = true
config.Producer.Transaction.Retry.Max = 200
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0
producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
defer producer.Close()
Expand All @@ -135,7 +141,6 @@ func TestFuncTxnCommitNoMessages(t *testing.T) {
config.Producer.Return.Errors = true
config.Producer.Transaction.Retry.Max = 200
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0
producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
defer producer.Close()
Expand Down Expand Up @@ -168,7 +173,6 @@ func TestFuncTxnProduce(t *testing.T) {
config.Producer.Transaction.Retry.Max = 200
config.Consumer.IsolationLevel = ReadCommitted
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -222,7 +226,6 @@ func TestFuncTxnProduceWithBrokerFailure(t *testing.T) {
config.Producer.Transaction.Retry.Max = 200
config.Consumer.IsolationLevel = ReadCommitted
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -289,7 +292,6 @@ func TestFuncTxnProduceEpochBump(t *testing.T) {
config.Producer.Transaction.Retry.Max = 200
config.Consumer.IsolationLevel = ReadCommitted
config.Net.MaxOpenRequests = 1
config.Version = V2_6_0_0

consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -358,7 +360,6 @@ func TestFuncInitProducerId3(t *testing.T) {
config.Producer.Retry.Max = 50
config.Consumer.IsolationLevel = ReadCommitted
config.Net.MaxOpenRequests = 1
config.Version = V2_6_0_0

producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -401,7 +402,6 @@ func TestFuncTxnProduceAndCommitOffset(t *testing.T) {
config.Consumer.IsolationLevel = ReadCommitted
config.Consumer.Offsets.AutoCommit.Enable = false
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -499,7 +499,6 @@ func TestFuncTxnProduceMultiTxn(t *testing.T) {
config.Producer.Transaction.Retry.Max = 200
config.Consumer.IsolationLevel = ReadCommitted
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

configSecond := NewFunctionalTestConfig()
configSecond.ChannelBufferSize = 20
Expand All @@ -511,7 +510,6 @@ func TestFuncTxnProduceMultiTxn(t *testing.T) {
configSecond.Producer.Retry.Max = 50
configSecond.Consumer.IsolationLevel = ReadCommitted
configSecond.Net.MaxOpenRequests = 1
configSecond.Version = V0_11_0_0

consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -585,7 +583,6 @@ func TestFuncTxnAbortedProduce(t *testing.T) {
config.Producer.Transaction.Retry.Max = 200
config.Consumer.IsolationLevel = ReadCommitted
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -671,7 +668,6 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
Expand Down Expand Up @@ -907,7 +903,6 @@ func TestAsyncProducerRemoteBrokerClosed(t *testing.T) {
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 1024
config.Producer.Retry.Backoff = time.Millisecond * 50
config.Version, _ = ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)

producer, err := NewAsyncProducer(
FunctionalTestEnv.KafkaBrokerAddrs,
Expand Down
15 changes: 8 additions & 7 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,15 @@ func testMain(m *testing.M) int {
// NewFunctionalTestConfig returns a config meant to be used by functional tests.
func NewFunctionalTestConfig() *Config {
config := NewConfig()
// config.Consumer.Retry.Backoff = 0
// config.Producer.Retry.Backoff = 0
config.Version = MinVersion
version, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
if err != nil {
config.Version = DefaultVersion
} else {
config.Version = version
}
return config
}

Expand Down Expand Up @@ -171,7 +179,6 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err
}

config := NewFunctionalTestConfig()
config.Version, err = ParseKafkaVersion(env.KafkaVersion)
if err != nil {
return err
}
Expand Down Expand Up @@ -315,11 +322,6 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
config.Metadata.Retry.Max = 5
config.Metadata.Retry.Backoff = 10 * time.Second
config.ClientID = "sarama-prepareTestTopics"
var err error
config.Version, err = ParseKafkaVersion(env.KafkaVersion)
if err != nil {
return fmt.Errorf("failed to parse kafka version %s: %w", env.KafkaVersion, err)
}

client, err := NewClient(env.KafkaBrokerAddrs, config)
if err != nil {
Expand Down Expand Up @@ -465,7 +467,6 @@ func ensureFullyReplicated(t testing.TB, timeout time.Duration, retry time.Durat
config.Metadata.Retry.Max = 5
config.Metadata.Retry.Backoff = 10 * time.Second
config.ClientID = "sarama-ensureFullyReplicated"
config.Version = V2_6_0_0

var testTopicNames []string
for topic := range testTopicDetails {
Expand Down