diff --git a/admin.go b/admin.go index d226b7623..a6114a867 100644 --- a/admin.go +++ b/admin.go @@ -1033,6 +1033,9 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { request := &DeleteGroupsRequest{ Groups: []string{group}, } + if ca.conf.Version.IsAtLeast(V2_0_0_0) { + request.Version = 1 + } resp, err := coordinator.DeleteGroups(request) if err != nil { diff --git a/delete_groups_request.go b/delete_groups_request.go index 2158370d5..2fdfc3386 100644 --- a/delete_groups_request.go +++ b/delete_groups_request.go @@ -27,11 +27,18 @@ func (r *DeleteGroupsRequest) headerVersion() int16 { } func (r *DeleteGroupsRequest) isValidVersion() bool { - return r.Version == 0 + return r.Version >= 0 && r.Version <= 1 } func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion { - return V1_1_0_0 + switch r.Version { + case 1: + return V2_0_0_0 + case 0: + return V1_1_0_0 + default: + return V2_0_0_0 + } } func (r *DeleteGroupsRequest) AddGroup(group string) { diff --git a/delete_groups_response.go b/delete_groups_response.go index 2d77ed2b2..e490f8314 100644 --- a/delete_groups_response.go +++ b/delete_groups_response.go @@ -71,11 +71,18 @@ func (r *DeleteGroupsResponse) headerVersion() int16 { } func (r *DeleteGroupsResponse) isValidVersion() bool { - return r.Version == 0 + return r.Version >= 0 && r.Version <= 1 } func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion { - return V1_1_0_0 + switch r.Version { + case 1: + return V2_0_0_0 + case 0: + return V1_1_0_0 + default: + return V2_0_0_0 + } } func (r *DeleteGroupsResponse) throttleTime() time.Duration {