Skip to content

Commit

Permalink
refactor back to a switch
Browse files Browse the repository at this point in the history
  • Loading branch information
dominicbarnes committed May 20, 2022
1 parent abce88e commit 8c22b7c
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,20 +725,24 @@ func (cg *ConsumerGroup) run() {
// joining or syncing the group.
var backoff <-chan time.Time

if err == nil {
switch {
case err == nil:
// no error...the previous generation finished normally.
continue
} else if errors.Is(err, ErrGroupClosed) {

case errors.Is(err, ErrGroupClosed):
// the CG has been closed...leave the group and exit loop.
_ = cg.leaveGroup(memberID)
return
} else if errors.Is(err, RebalanceInProgress) {

case errors.Is(err, RebalanceInProgress):
// in case of a RebalanceInProgress, don't leave the group or
// change the member ID, but report the error. the next attempt
// to join the group will then be subject to the rebalance
// timeout, so the broker will be responsible for throttling
// this loop.
} else {

default:
// leave the group and report the error if we had gotten far
// enough so as to have a member ID. also clear the member id
// so we don't attempt to use it again. in order to avoid
Expand Down

0 comments on commit 8c22b7c

Please sign in to comment.