From 9cb6663bd5e13a48582ba57dc511b52a351de90d Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Thu, 8 Apr 2021 12:20:17 +0200 Subject: [PATCH 1/7] Exit broker op loop on wakeup --- CHANGELOG.md | 2 ++ src/rdkafka_broker.c | 50 ++++++++++++++++++++++++++++++-------------- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8de5cbe58..c3d1803e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,8 @@ librdkafka v1.7.0 is feature release: * The initial cluster connection on client instance creation could sometimes be delayed up to 1 second if a `group.id` or `transactional.id` was configured (#3305). + * Speed up triggering of new broker connections in certain cases by exiting + the broker thread io/op poll loop when a wakeup op is received. ### Consumer fixes diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index fcfdc8cdc..a882639e4 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -2965,15 +2965,16 @@ static void rd_kafka_broker_prepare_destroy (rd_kafka_broker_t *rkb) { * @brief Serve a broker op (an op posted by another thread to be handled by * this broker's thread). * - * @returns 0 if calling op loop should break out, else 1 to continue. + * @returns true if calling op loop should break out, else false to continue. * @locality broker thread * @locks none */ -static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb, - rd_kafka_op_t *rko) { +static RD_WARN_UNUSED_RESULT +rd_bool_t rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb, + rd_kafka_op_t *rko) { rd_kafka_toppar_t *rktp; rd_kafka_resp_err_t topic_err; - int ret = 1; + rd_bool_t wakeup = rd_false; rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread)); @@ -3300,10 +3301,11 @@ static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb, "Client is terminating"); rd_kafka_broker_prepare_destroy(rkb); - ret = 0; + wakeup = rd_true; break; case RD_KAFKA_OP_WAKEUP: + wakeup = rd_true; break; case RD_KAFKA_OP_PURGE: @@ -3345,6 +3347,8 @@ static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb, /* Expedite next reconnect */ rkb->rkb_ts_reconnect = 0; + + wakeup = rd_true; break; default: @@ -3355,7 +3359,7 @@ static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb, if (rko) rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR); - return ret; + return wakeup; } @@ -3364,13 +3368,13 @@ static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb, * @brief Serve broker ops. * @returns the number of ops served */ -static int rd_kafka_broker_ops_serve (rd_kafka_broker_t *rkb, - rd_ts_t timeout_us) { +static RD_WARN_UNUSED_RESULT +int rd_kafka_broker_ops_serve (rd_kafka_broker_t *rkb, rd_ts_t timeout_us) { rd_kafka_op_t *rko; int cnt = 0; while ((rko = rd_kafka_q_pop(rkb->rkb_ops, timeout_us, 0)) && - (cnt++, rd_kafka_broker_op_serve(rkb, rko))) + (cnt++, !rd_kafka_broker_op_serve(rkb, rko))) timeout_us = RD_POLL_NOWAIT; return cnt; @@ -3389,12 +3393,17 @@ static int rd_kafka_broker_ops_serve (rd_kafka_broker_t *rkb, * * @param abs_timeout Maximum block time (absolute time). * + * @returns true on wakeup (broker state machine needs to be served), + * else false. + * * @locality broker thread * @locks none */ -static void rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb, - rd_ts_t abs_timeout) { +static RD_WARN_UNUSED_RESULT +rd_bool_t rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb, + rd_ts_t abs_timeout) { rd_ts_t now; + rd_bool_t wakeup; if (unlikely(rd_kafka_terminating(rkb->rkb_rk))) abs_timeout = rd_clock() + 1000; @@ -3420,7 +3429,8 @@ static void rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb, /* Serve broker ops */ - rd_kafka_broker_ops_serve(rkb, rd_timeout_remains_us(abs_timeout)); + wakeup = rd_kafka_broker_ops_serve(rkb, + rd_timeout_remains_us(abs_timeout)); /* An op might have triggered the need for a connection, if so * transition to TRY_CONNECT state. */ @@ -3430,12 +3440,15 @@ static void rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb, rd_kafka_broker_set_state( rkb, RD_KAFKA_BROKER_STATE_TRY_CONNECT); rd_kafka_broker_unlock(rkb); + wakeup = rd_true; } /* Scan queues for timeouts. */ now = rd_clock(); if (rd_interval(&rkb->rkb_timeout_scan_intvl, 1000000, now) > 0) rd_kafka_broker_timeout_scan(rkb, now); + + return wakeup; } @@ -3568,16 +3581,18 @@ rd_kafka_broker_toppars_timeout_scan (rd_kafka_broker_t *rkb, rd_ts_t now) { static void rd_kafka_broker_internal_serve (rd_kafka_broker_t *rkb, rd_ts_t abs_timeout) { int initial_state = rkb->rkb_state; + rd_bool_t wakeup; if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) { /* Consumer */ do { rd_kafka_broker_consumer_toppars_serve(rkb); - rd_kafka_broker_ops_io_serve(rkb, abs_timeout); + wakeup = rd_kafka_broker_ops_io_serve(rkb, abs_timeout); } while (!rd_kafka_broker_terminating(rkb) && (int)rkb->rkb_state == initial_state && + !wakeup && !rd_timeout_expired(rd_timeout_remains(abs_timeout))); } else { /* Producer */ @@ -3591,11 +3606,12 @@ static void rd_kafka_broker_internal_serve (rd_kafka_broker_t *rkb, rd_kafka_broker_toppars_timeout_scan( rkb, now); - rd_kafka_broker_ops_io_serve( + wakeup = rd_kafka_broker_ops_io_serve( rkb, RD_MIN(abs_timeout, next_timeout_scan)); } while (!rd_kafka_broker_terminating(rkb) && (int)rkb->rkb_state == initial_state && + !wakeup && !rd_timeout_expired(rd_timeout_remains(abs_timeout))); } } @@ -4012,7 +4028,8 @@ static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb, if (unlikely(rd_atomic32_get(&rkb->rkb_retrybufs.rkbq_cnt) > 0)) rd_kafka_broker_retry_bufs_move(rkb, &next_wakeup); - rd_kafka_broker_ops_io_serve(rkb, next_wakeup); + if (rd_kafka_broker_ops_io_serve(rkb, next_wakeup)) + return; /* Wakeup */ rd_kafka_broker_lock(rkb); } @@ -5002,7 +5019,8 @@ static void rd_kafka_broker_consumer_serve (rd_kafka_broker_t *rkb, if (min_backoff > abs_timeout) min_backoff = abs_timeout; - rd_kafka_broker_ops_io_serve(rkb, min_backoff); + if (rd_kafka_broker_ops_io_serve(rkb, min_backoff)) + return; /* Wakeup */ rd_kafka_broker_lock(rkb); } From c973be69621db9fbd1980a88e9f7fc924219f7fd Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Thu, 8 Apr 2021 12:21:25 +0200 Subject: [PATCH 2/7] Execute GSSAPI kinit from background thread and wait for it to finish before connecting Fixes https://github.com/confluentinc/confluent-kafka-python/issues/1023 --- CHANGELOG.md | 4 +++ src/rdkafka_sasl_cyrus.c | 42 +++++++++++++++++++++++--- src/rdkafka_timer.c | 65 ++++++++++++++++++++++++++++++++-------- src/rdkafka_timer.h | 4 +++ 4 files changed, 98 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c3d1803e9..8d732baeb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,10 @@ librdkafka v1.7.0 is feature release: was configured (#3305). * Speed up triggering of new broker connections in certain cases by exiting the broker thread io/op poll loop when a wakeup op is received. + * SASL GSSAPI: The Kerberos kinit refresh command was triggered from + `rd_kafka_new()` which made this call blocking if the refresh command + was taking long. The refresh is now performed by the background rdkafka + main thread. ### Consumer fixes diff --git a/src/rdkafka_sasl_cyrus.c b/src/rdkafka_sasl_cyrus.c index 1cc5ed07f..ed14c16b2 100644 --- a/src/rdkafka_sasl_cyrus.c +++ b/src/rdkafka_sasl_cyrus.c @@ -57,6 +57,8 @@ static mtx_t rd_kafka_sasl_cyrus_kinit_lock; */ typedef struct rd_kafka_sasl_cyrus_handle_s { rd_kafka_timer_t kinit_refresh_tmr; + rd_atomic32_t ready; /**< First kinit command has finished, or there + * is no kinit command. */ } rd_kafka_sasl_cyrus_handle_t; /** @@ -197,10 +199,12 @@ static ssize_t render_callback (const char *key, char *buf, * @locality rdkafka main thread */ static int rd_kafka_sasl_cyrus_kinit_refresh (rd_kafka_t *rk) { + rd_kafka_sasl_cyrus_handle_t *handle = rk->rk_sasl.handle; int r; char *cmd; char errstr[128]; rd_ts_t ts_start; + int duration; /* Build kinit refresh command line using string rendering and config */ cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd, @@ -226,6 +230,21 @@ static int rd_kafka_sasl_cyrus_kinit_refresh (rd_kafka_t *rk) { r = system(cmd); mtx_unlock(&rd_kafka_sasl_cyrus_kinit_lock); + duration = (int)((rd_clock() - ts_start) / 1000); + if (duration > 5000) + rd_kafka_log(rk, LOG_WARNING, "SASLREFRESH", + "Slow Kerberos ticket refresh: %dms: %s", + duration, cmd); + + /* Regardless of outcome from the kinit command (it can fail + * even if the ticket is available), we now allow broker connections. */ + if (rd_atomic32_add(&handle->ready, 1) == 1) { + rd_kafka_dbg(rk, SECURITY, "SASLREFRESH", + "First kinit command finished: waking up " + "broker threads"); + rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT); + } + if (r == -1) { if (errno == ECHILD) { rd_kafka_log(rk, LOG_WARNING, "SASLREFRESH", @@ -259,8 +278,7 @@ static int rd_kafka_sasl_cyrus_kinit_refresh (rd_kafka_t *rk) { rd_free(cmd); rd_kafka_dbg(rk, SECURITY, "SASLREFRESH", - "Kerberos ticket refreshed in %"PRId64"ms", - (rd_clock() - ts_start) / 1000); + "Kerberos ticket refreshed in %dms", duration); return 0; } @@ -547,6 +565,19 @@ static int rd_kafka_sasl_cyrus_client_new (rd_kafka_transport_t *rktrans, } +/** + * @brief SASL/GSSAPI is ready when at least one kinit command has been + * executed (regardless of exit status). + */ +static rd_bool_t rd_kafka_sasl_cyrus_ready (rd_kafka_t *rk) { + rd_kafka_sasl_cyrus_handle_t *handle = rk->rk_sasl.handle; + + if (!handle) + return rd_false; + + return rd_atomic32_get(&handle->ready) > 0; +} + /** * @brief Per-client-instance initializer */ @@ -566,8 +597,10 @@ static int rd_kafka_sasl_cyrus_init (rd_kafka_t *rk, rk->rk_conf.sasl.relogin_min_time * 1000ll, rd_kafka_sasl_cyrus_kinit_refresh_tmr_cb, rk); - /* Acquire or refresh ticket */ - rd_kafka_sasl_cyrus_kinit_refresh(rk); + /* Kick off the timer immediately to refresh the ticket. + * (Timer is triggered from the main loop). */ + rd_kafka_timer_override_once(&rk->rk_timers, &handle->kinit_refresh_tmr, + 0/*immediately*/); return 0; } @@ -653,5 +686,6 @@ const struct rd_kafka_sasl_provider rd_kafka_sasl_cyrus_provider = { .client_new = rd_kafka_sasl_cyrus_client_new, .recv = rd_kafka_sasl_cyrus_recv, .close = rd_kafka_sasl_cyrus_close, + .ready = rd_kafka_sasl_cyrus_ready, .conf_validate = rd_kafka_sasl_cyrus_conf_validate }; diff --git a/src/rdkafka_timer.c b/src/rdkafka_timer.c index 2657808a2..58610d92f 100644 --- a/src/rdkafka_timer.c +++ b/src/rdkafka_timer.c @@ -63,9 +63,42 @@ static void rd_kafka_timer_unschedule (rd_kafka_timers_t *rkts, rtmr->rtmr_next = 0; } + +/** + * @brief Schedule the next firing of the timer at \p abs_time. + * + * @remark Will not update rtmr_interval, only rtmr_next. + * + * @locks_required timers_lock() + */ +static void rd_kafka_timer_schedule_next (rd_kafka_timers_t *rkts, + rd_kafka_timer_t *rtmr, + rd_ts_t abs_time) { + rd_kafka_timer_t *first; + + rtmr->rtmr_next = abs_time; + + if (!(first = TAILQ_FIRST(&rkts->rkts_timers)) || + first->rtmr_next > rtmr->rtmr_next) { + TAILQ_INSERT_HEAD(&rkts->rkts_timers, rtmr, rtmr_link); + cnd_signal(&rkts->rkts_cond); + if (rkts->rkts_wakeq) + rd_kafka_q_yield(rkts->rkts_wakeq); + } else + TAILQ_INSERT_SORTED(&rkts->rkts_timers, rtmr, + rd_kafka_timer_t *, rtmr_link, + rd_kafka_timer_cmp); +} + + +/** + * @brief Schedule the next firing of the timer according to the timer's + * interval plus an optional \p extra_us. + * + * @locks_required timers_lock() + */ static void rd_kafka_timer_schedule (rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr, int extra_us) { - rd_kafka_timer_t *first; /* Timer has been stopped */ if (!rtmr->rtmr_interval) @@ -75,18 +108,8 @@ static void rd_kafka_timer_schedule (rd_kafka_timers_t *rkts, if (unlikely(!rkts->rkts_enabled)) return; - rtmr->rtmr_next = rd_clock() + rtmr->rtmr_interval + extra_us; - - if (!(first = TAILQ_FIRST(&rkts->rkts_timers)) || - first->rtmr_next > rtmr->rtmr_next) { - TAILQ_INSERT_HEAD(&rkts->rkts_timers, rtmr, rtmr_link); - cnd_signal(&rkts->rkts_cond); - if (rkts->rkts_wakeq) - rd_kafka_q_yield(rkts->rkts_wakeq); - } else - TAILQ_INSERT_SORTED(&rkts->rkts_timers, rtmr, - rd_kafka_timer_t *, rtmr_link, - rd_kafka_timer_cmp); + rd_kafka_timer_schedule_next( + rkts, rtmr, rd_clock() + rtmr->rtmr_interval + extra_us); } /** @@ -181,6 +204,22 @@ void rd_kafka_timer_exp_backoff (rd_kafka_timers_t *rkts, rd_kafka_timers_unlock(rkts); } +/** + * @brief Override the interval once for the next firing of the timer. + * + * @locks_required none + * @locks_acquired timers_lock + */ +void rd_kafka_timer_override_once (rd_kafka_timers_t *rkts, + rd_kafka_timer_t *rtmr, + rd_ts_t interval) { + rd_kafka_timers_lock(rkts); + if (rd_kafka_timer_scheduled(rtmr)) + rd_kafka_timer_unschedule(rkts, rtmr); + rd_kafka_timer_schedule_next(rkts, rtmr, rd_clock() + interval); + rd_kafka_timers_unlock(rkts); +} + /** * @returns the delta time to the next time (>=0) this timer fires, or -1 diff --git a/src/rdkafka_timer.h b/src/rdkafka_timer.h index ad57695da..8a50b556c 100644 --- a/src/rdkafka_timer.h +++ b/src/rdkafka_timer.h @@ -84,6 +84,10 @@ void rd_kafka_timer_exp_backoff (rd_kafka_timers_t *rkts, rd_ts_t rd_kafka_timer_next (rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr, int do_lock); +void rd_kafka_timer_override_once (rd_kafka_timers_t *rkts, + rd_kafka_timer_t *rtmr, + rd_ts_t interval); + /** * @returns true if timer is started. * From 9edb85868f89d1fe6dce5a79525e67a797102b0a Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Thu, 8 Apr 2021 12:21:58 +0200 Subject: [PATCH 3/7] Treat libsasl "GSSAPI client step .." logs as debug --- src/rdkafka_sasl_cyrus.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/rdkafka_sasl_cyrus.c b/src/rdkafka_sasl_cyrus.c index ed14c16b2..b9b44d2d9 100644 --- a/src/rdkafka_sasl_cyrus.c +++ b/src/rdkafka_sasl_cyrus.c @@ -337,7 +337,9 @@ static int rd_kafka_sasl_cyrus_cb_log (void *context, int level, "make sure the libsasl2-modules-gssapi-mit or " "cyrus-sasl-gssapi packages are installed"; - if (level >= LOG_DEBUG) + /* Treat the "client step" log messages as debug. */ + if (level >= LOG_DEBUG || + !strncmp(message, "GSSAPI client step ", 19)) rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", "%s", message); else From 942d2089729a3404418c08714ae4343c5c28821f Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Thu, 8 Apr 2021 14:57:05 +0200 Subject: [PATCH 4/7] Fix unittest stability --- src/rdkafka_request.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 54809838c..54c3750e0 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -4526,6 +4526,8 @@ ut_create_msgs (rd_kafka_msgq_t *rkmq, uint64_t msgid, int cnt) { rkm = ut_rd_kafka_msg_new(0); rkm->rkm_u.producer.msgid = msgid++; + rkm->rkm_ts_enq = rd_clock(); + rkm->rkm_ts_timeout = rkm->rkm_ts_enq + (900 * 1000 * 1000); rd_kafka_msgq_enq(rkmq, rkm); } @@ -4564,6 +4566,7 @@ static int unittest_idempotent_producer (void) { int retry_msg_cnt = 0; int drcnt = 0; rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq); + const char *tmp; int i, r; RD_UT_SAY("Verifying idempotent producer error handling"); @@ -4571,6 +4574,8 @@ static int unittest_idempotent_producer (void) { conf = rd_kafka_conf_new(); rd_kafka_conf_set(conf, "batch.num.messages", "3", NULL, 0); rd_kafka_conf_set(conf, "retry.backoff.ms", "1", NULL, 0); + if ((tmp = rd_getenv("TEST_DEBUG", NULL))) + rd_kafka_conf_set(conf, "debug", tmp, NULL, 0); if (rd_kafka_conf_set(conf, "enable.idempotence", "true", NULL, 0) != RD_KAFKA_CONF_OK) RD_UT_FAIL("Failed to enable idempotence"); From 9d74c4c551070e5bd305309182f47a53d25a4835 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Fri, 9 Apr 2021 09:58:31 +0200 Subject: [PATCH 5/7] Don't propagate retriable FindCoordinator errors --- CHANGELOG.md | 2 ++ src/rdkafka_cgrp.c | 38 ++++++++++++++++++++++++++++++-------- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d732baeb..55318db01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,6 +74,8 @@ librdkafka v1.7.0 is feature release: timed auto committer is only employed in the steady state when no rebalances are taking places. Offsets are still auto committed when partitions are revoked. + * Retriable FindCoordinatorRequest errors are no longer propagated to + the application as they are retried automatically. ### Producer fixes diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 9ef535e49..ebfbcf078 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -561,6 +561,7 @@ static void rd_kafka_cgrp_handle_FindCoordinator (rd_kafka_t *rk, rd_kafka_cgrp_t *rkcg = opaque; struct rd_kafka_metadata_broker mdb = RD_ZERO_INIT; char *errstr = NULL; + int actions; if (likely(!(ErrorCode = err))) { if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) @@ -583,7 +584,7 @@ static void rd_kafka_cgrp_handle_FindCoordinator (rd_kafka_t *rk, } if (ErrorCode) - goto err2; + goto err; mdb.id = CoordId; @@ -604,7 +605,7 @@ static void rd_kafka_cgrp_handle_FindCoordinator (rd_kafka_t *rk, ErrorCode = rkbuf->rkbuf_err; /* FALLTHRU */ -err2: +err: if (!errstr) errstr = (char *)rd_kafka_err2str(ErrorCode); @@ -616,12 +617,31 @@ static void rd_kafka_cgrp_handle_FindCoordinator (rd_kafka_t *rk, if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY) return; - /* No need for retries since the coord query is intervalled. */ + actions = rd_kafka_err_action( + rkb, ErrorCode, request, + + RD_KAFKA_ERR_ACTION_RETRY|RD_KAFKA_ERR_ACTION_REFRESH, + RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE, + + RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR__TRANSPORT, + + RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR__TIMED_OUT, - if (ErrorCode == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE) + RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, + + RD_KAFKA_ERR_ACTION_END); + + + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { rd_kafka_cgrp_coord_update(rkcg, -1); - else { - if (rkcg->rkcg_last_err != ErrorCode) { + } else { + if (!(actions & RD_KAFKA_ERR_ACTION_RETRY) && + rkcg->rkcg_last_err != ErrorCode) { + /* Propagate non-retriable errors to the application */ rd_kafka_consumer_err( rkcg->rkcg_q, rd_kafka_broker_id(rkb), ErrorCode, 0, NULL, NULL, @@ -632,8 +652,10 @@ static void rd_kafka_cgrp_handle_FindCoordinator (rd_kafka_t *rk, rkcg->rkcg_last_err = ErrorCode; } - /* Continue querying */ - rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); + /* Retries are performed by the timer-intervalled + * coord queries, continue querying */ + rd_kafka_cgrp_set_state( + rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); } rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */ From 2685a27b0bb849d7db720c409ed7b3a8a4515d2c Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Fri, 9 Apr 2021 09:57:11 +0200 Subject: [PATCH 6/7] Harden tests by waiting an additional second for new topics to propagate in cluster --- tests/0081-admin.c | 2 ++ tests/test.c | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/tests/0081-admin.c b/tests/0081-admin.c index dd2ffd3d2..ea029e3e4 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -1708,6 +1708,8 @@ static void do_test_DeleteConsumerGroupOffsets (const char *what, NULL, 0, 15*1000); + rd_sleep(1); /* Additional wait time for cluster propagation */ + consumer = test_create_consumer(groupid, NULL, NULL, NULL); if (sub_consumer) { diff --git a/tests/test.c b/tests/test.c index c39121402..3baa62148 100644 --- a/tests/test.c +++ b/tests/test.c @@ -5273,6 +5273,11 @@ void test_wait_topic_exists (rd_kafka_t *rk, const char *topic, int tmout) { rd_kafka_metadata_topic_t topics = { .topic = (char *)topic }; test_wait_metadata_update(rk, &topics, 1, NULL, 0, tmout); + + /* Wait an additional second for the topic to propagate in + * the cluster. This is not perfect but a cheap workaround for + * the asynchronous nature of topic creations in Kafka. */ + rd_sleep(1); } From cb0293b342aaf7ddbaf44cbe270556a3a2b10eb7 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Sat, 10 Apr 2021 19:07:27 +0200 Subject: [PATCH 7/7] A commit op with ERR__DESTROY would call the assignment code from the wrong thread --- CHANGELOG.md | 2 ++ src/rdkafka_cgrp.c | 8 ++++++++ tests/0116-kafkaconsumer_close.cpp | 3 ++- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 55318db01..7bb540dfe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,8 @@ librdkafka v1.7.0 is feature release: revoked. * Retriable FindCoordinatorRequest errors are no longer propagated to the application as they are retried automatically. + * Fix rare crash (assert `rktp_started`) on consumer termination + (introduced in v1.6.0). ### Producer fixes diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index ebfbcf078..4b05a1fd5 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -2844,6 +2844,14 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit (rd_kafka_t *rk, rd_kafka_assert(NULL, rk->rk_consumer.wait_commit_cnt > 0); rk->rk_consumer.wait_commit_cnt--; + if (err == RD_KAFKA_RESP_ERR__DESTROY) { + rd_kafka_op_destroy(rko_orig); + return; /* Handle is terminating, this op may be handled + * by the op enq()ing thread rather than the + * rdkafka main thread, it is not safe to + * continue here. */ + } + /* Update the committed offsets for each partition's rktp. */ errcnt = rd_kafka_cgrp_update_committed_offsets(rkcg, err, offsets); diff --git a/tests/0116-kafkaconsumer_close.cpp b/tests/0116-kafkaconsumer_close.cpp index 7ef7efabd..b6bd8ace0 100644 --- a/tests/0116-kafkaconsumer_close.cpp +++ b/tests/0116-kafkaconsumer_close.cpp @@ -70,7 +70,8 @@ static void do_test_consumer_close (bool do_subscribe, delete p; /* Create consumer */ - RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + RdKafka::Conf *conf; + Test::conf_init(&conf, NULL, 0); Test::conf_set(conf, "bootstrap.servers", bootstraps); Test::conf_set(conf, "group.id", "mygroup"); Test::conf_set(conf, "auto.offset.reset", "beginning");