Skip to content

Commit

Permalink
refactor back to a switch
Browse files Browse the repository at this point in the history
  • Loading branch information
dominicbarnes committed May 20, 2022
1 parent abce88e commit abf5a3c
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,14 +523,16 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
return
case <-ticker.C:
ops, err := g.conn.readPartitions(topic)
if err == nil || errors.Is(err, UnknownTopicOrPartition) {
switch {
case err == nil, errors.Is(err, UnknownTopicOrPartition):
if len(ops) != oParts {
g.log(func(l Logger) {
l.Printf("Partition changes found, reblancing group: %v.", g.GroupID)
})
return
}
} else {

default:
g.logError(func(l Logger) {
l.Printf("Problem getting partitions while checking for changes, %v", err)
})
Expand Down Expand Up @@ -725,20 +727,24 @@ func (cg *ConsumerGroup) run() {
// joining or syncing the group.
var backoff <-chan time.Time

if err == nil {
switch {
case err == nil:
// no error...the previous generation finished normally.
continue
} else if errors.Is(err, ErrGroupClosed) {

case errors.Is(err, ErrGroupClosed):
// the CG has been closed...leave the group and exit loop.
_ = cg.leaveGroup(memberID)
return
} else if errors.Is(err, RebalanceInProgress) {

case errors.Is(err, RebalanceInProgress):
// in case of a RebalanceInProgress, don't leave the group or
// change the member ID, but report the error. the next attempt
// to join the group will then be subject to the rebalance
// timeout, so the broker will be responsible for throttling
// this loop.
} else {

default:
// leave the group and report the error if we had gotten far
// enough so as to have a member ID. also clear the member id
// so we don't attempt to use it again. in order to avoid
Expand Down

0 comments on commit abf5a3c

Please sign in to comment.