Skip to content

Commit

Permalink
A commit op with ERR__DESTROY would call the assignment code from the…
Browse files Browse the repository at this point in the history
… wrong thread
  • Loading branch information
edenhill committed Apr 14, 2021
1 parent 2685a27 commit cb0293b
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ librdkafka v1.7.0 is feature release:
revoked.
* Retriable FindCoordinatorRequest errors are no longer propagated to
the application as they are retried automatically.
* Fix rare crash (assert `rktp_started`) on consumer termination
(introduced in v1.6.0).

### Producer fixes

Expand Down
8 changes: 8 additions & 0 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2844,6 +2844,14 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit (rd_kafka_t *rk,
rd_kafka_assert(NULL, rk->rk_consumer.wait_commit_cnt > 0);
rk->rk_consumer.wait_commit_cnt--;

if (err == RD_KAFKA_RESP_ERR__DESTROY) {
rd_kafka_op_destroy(rko_orig);
return; /* Handle is terminating, this op may be handled
* by the op enq()ing thread rather than the
* rdkafka main thread, it is not safe to
* continue here. */
}

/* Update the committed offsets for each partition's rktp. */
errcnt = rd_kafka_cgrp_update_committed_offsets(rkcg, err, offsets);

Expand Down
3 changes: 2 additions & 1 deletion tests/0116-kafkaconsumer_close.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ static void do_test_consumer_close (bool do_subscribe,
delete p;

/* Create consumer */
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *conf;
Test::conf_init(&conf, NULL, 0);
Test::conf_set(conf, "bootstrap.servers", bootstraps);
Test::conf_set(conf, "group.id", "mygroup");
Test::conf_set(conf, "auto.offset.reset", "beginning");
Expand Down

0 comments on commit cb0293b

Please sign in to comment.