Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't call kinit cmd from rd_kafka_new() #3340

Merged
merged 7 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ librdkafka v1.7.0 is feature release:
* The initial cluster connection on client instance creation could sometimes
be delayed up to 1 second if a `group.id` or `transactional.id`
was configured (#3305).
* Speed up triggering of new broker connections in certain cases by exiting
the broker thread io/op poll loop when a wakeup op is received.
* SASL GSSAPI: The Kerberos kinit refresh command was triggered from
`rd_kafka_new()` which made this call blocking if the refresh command
was taking long. The refresh is now performed by the background rdkafka
main thread.

### Consumer fixes

Expand All @@ -68,6 +74,10 @@ librdkafka v1.7.0 is feature release:
timed auto committer is only employed in the steady state when no rebalances
are taking places. Offsets are still auto committed when partitions are
revoked.
* Retriable FindCoordinatorRequest errors are no longer propagated to
the application as they are retried automatically.
* Fix rare crash (assert `rktp_started`) on consumer termination
(introduced in v1.6.0).

### Producer fixes

Expand Down
50 changes: 34 additions & 16 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2965,15 +2965,16 @@ static void rd_kafka_broker_prepare_destroy (rd_kafka_broker_t *rkb) {
* @brief Serve a broker op (an op posted by another thread to be handled by
* this broker's thread).
*
* @returns 0 if calling op loop should break out, else 1 to continue.
* @returns true if calling op loop should break out, else false to continue.
* @locality broker thread
* @locks none
*/
static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb,
rd_kafka_op_t *rko) {
static RD_WARN_UNUSED_RESULT
rd_bool_t rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb,
rd_kafka_op_t *rko) {
rd_kafka_toppar_t *rktp;
rd_kafka_resp_err_t topic_err;
int ret = 1;
rd_bool_t wakeup = rd_false;

rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));

Expand Down Expand Up @@ -3300,10 +3301,11 @@ static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb,
"Client is terminating");

rd_kafka_broker_prepare_destroy(rkb);
ret = 0;
wakeup = rd_true;
break;

case RD_KAFKA_OP_WAKEUP:
wakeup = rd_true;
break;

case RD_KAFKA_OP_PURGE:
Expand Down Expand Up @@ -3345,6 +3347,8 @@ static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb,

/* Expedite next reconnect */
rkb->rkb_ts_reconnect = 0;

wakeup = rd_true;
break;

default:
Expand All @@ -3355,7 +3359,7 @@ static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb,
if (rko)
rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);

return ret;
return wakeup;
}


Expand All @@ -3364,13 +3368,13 @@ static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb,
* @brief Serve broker ops.
* @returns the number of ops served
*/
static int rd_kafka_broker_ops_serve (rd_kafka_broker_t *rkb,
rd_ts_t timeout_us) {
static RD_WARN_UNUSED_RESULT
int rd_kafka_broker_ops_serve (rd_kafka_broker_t *rkb, rd_ts_t timeout_us) {
rd_kafka_op_t *rko;
int cnt = 0;

while ((rko = rd_kafka_q_pop(rkb->rkb_ops, timeout_us, 0)) &&
(cnt++, rd_kafka_broker_op_serve(rkb, rko)))
(cnt++, !rd_kafka_broker_op_serve(rkb, rko)))
timeout_us = RD_POLL_NOWAIT;

return cnt;
Expand All @@ -3389,12 +3393,17 @@ static int rd_kafka_broker_ops_serve (rd_kafka_broker_t *rkb,
*
* @param abs_timeout Maximum block time (absolute time).
*
* @returns true on wakeup (broker state machine needs to be served),
* else false.
*
* @locality broker thread
* @locks none
*/
static void rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb,
rd_ts_t abs_timeout) {
static RD_WARN_UNUSED_RESULT
rd_bool_t rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb,
rd_ts_t abs_timeout) {
rd_ts_t now;
rd_bool_t wakeup;

if (unlikely(rd_kafka_terminating(rkb->rkb_rk)))
abs_timeout = rd_clock() + 1000;
Expand All @@ -3420,7 +3429,8 @@ static void rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb,


/* Serve broker ops */
rd_kafka_broker_ops_serve(rkb, rd_timeout_remains_us(abs_timeout));
wakeup = rd_kafka_broker_ops_serve(rkb,
rd_timeout_remains_us(abs_timeout));

/* An op might have triggered the need for a connection, if so
* transition to TRY_CONNECT state. */
Expand All @@ -3430,12 +3440,15 @@ static void rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb,
rd_kafka_broker_set_state(
rkb, RD_KAFKA_BROKER_STATE_TRY_CONNECT);
rd_kafka_broker_unlock(rkb);
wakeup = rd_true;
}

/* Scan queues for timeouts. */
now = rd_clock();
if (rd_interval(&rkb->rkb_timeout_scan_intvl, 1000000, now) > 0)
rd_kafka_broker_timeout_scan(rkb, now);

return wakeup;
}


Expand Down Expand Up @@ -3568,16 +3581,18 @@ rd_kafka_broker_toppars_timeout_scan (rd_kafka_broker_t *rkb, rd_ts_t now) {
static void rd_kafka_broker_internal_serve (rd_kafka_broker_t *rkb,
rd_ts_t abs_timeout) {
int initial_state = rkb->rkb_state;
rd_bool_t wakeup;

if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) {
/* Consumer */
do {
rd_kafka_broker_consumer_toppars_serve(rkb);

rd_kafka_broker_ops_io_serve(rkb, abs_timeout);
wakeup = rd_kafka_broker_ops_io_serve(rkb, abs_timeout);

} while (!rd_kafka_broker_terminating(rkb) &&
(int)rkb->rkb_state == initial_state &&
!wakeup &&
!rd_timeout_expired(rd_timeout_remains(abs_timeout)));
} else {
/* Producer */
Expand All @@ -3591,11 +3606,12 @@ static void rd_kafka_broker_internal_serve (rd_kafka_broker_t *rkb,
rd_kafka_broker_toppars_timeout_scan(
rkb, now);

rd_kafka_broker_ops_io_serve(
wakeup = rd_kafka_broker_ops_io_serve(
rkb, RD_MIN(abs_timeout, next_timeout_scan));

} while (!rd_kafka_broker_terminating(rkb) &&
(int)rkb->rkb_state == initial_state &&
!wakeup &&
!rd_timeout_expired(rd_timeout_remains(abs_timeout)));
}
}
Expand Down Expand Up @@ -4012,7 +4028,8 @@ static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb,
if (unlikely(rd_atomic32_get(&rkb->rkb_retrybufs.rkbq_cnt) > 0))
rd_kafka_broker_retry_bufs_move(rkb, &next_wakeup);

rd_kafka_broker_ops_io_serve(rkb, next_wakeup);
if (rd_kafka_broker_ops_io_serve(rkb, next_wakeup))
return; /* Wakeup */

rd_kafka_broker_lock(rkb);
}
Expand Down Expand Up @@ -5002,7 +5019,8 @@ static void rd_kafka_broker_consumer_serve (rd_kafka_broker_t *rkb,
if (min_backoff > abs_timeout)
min_backoff = abs_timeout;

rd_kafka_broker_ops_io_serve(rkb, min_backoff);
if (rd_kafka_broker_ops_io_serve(rkb, min_backoff))
return; /* Wakeup */

rd_kafka_broker_lock(rkb);
}
Expand Down
46 changes: 38 additions & 8 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ static void rd_kafka_cgrp_handle_FindCoordinator (rd_kafka_t *rk,
rd_kafka_cgrp_t *rkcg = opaque;
struct rd_kafka_metadata_broker mdb = RD_ZERO_INIT;
char *errstr = NULL;
int actions;

if (likely(!(ErrorCode = err))) {
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1)
Expand All @@ -583,7 +584,7 @@ static void rd_kafka_cgrp_handle_FindCoordinator (rd_kafka_t *rk,
}

if (ErrorCode)
goto err2;
goto err;


mdb.id = CoordId;
Expand All @@ -604,7 +605,7 @@ static void rd_kafka_cgrp_handle_FindCoordinator (rd_kafka_t *rk,
ErrorCode = rkbuf->rkbuf_err;
/* FALLTHRU */

err2:
err:
if (!errstr)
errstr = (char *)rd_kafka_err2str(ErrorCode);

Expand All @@ -616,12 +617,31 @@ static void rd_kafka_cgrp_handle_FindCoordinator (rd_kafka_t *rk,
if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
return;

/* No need for retries since the coord query is intervalled. */
actions = rd_kafka_err_action(
rkb, ErrorCode, request,

RD_KAFKA_ERR_ACTION_RETRY|RD_KAFKA_ERR_ACTION_REFRESH,
RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,

RD_KAFKA_ERR_ACTION_RETRY,
RD_KAFKA_RESP_ERR__TRANSPORT,

RD_KAFKA_ERR_ACTION_RETRY,
RD_KAFKA_RESP_ERR__TIMED_OUT,

RD_KAFKA_ERR_ACTION_RETRY,
RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,

if (ErrorCode == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE)
RD_KAFKA_ERR_ACTION_END);



if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
rd_kafka_cgrp_coord_update(rkcg, -1);
else {
if (rkcg->rkcg_last_err != ErrorCode) {
} else {
if (!(actions & RD_KAFKA_ERR_ACTION_RETRY) &&
rkcg->rkcg_last_err != ErrorCode) {
/* Propagate non-retriable errors to the application */
rd_kafka_consumer_err(
rkcg->rkcg_q, rd_kafka_broker_id(rkb),
ErrorCode, 0, NULL, NULL,
Expand All @@ -632,8 +652,10 @@ static void rd_kafka_cgrp_handle_FindCoordinator (rd_kafka_t *rk,
rkcg->rkcg_last_err = ErrorCode;
}

/* Continue querying */
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
/* Retries are performed by the timer-intervalled
* coord queries, continue querying */
rd_kafka_cgrp_set_state(
rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
}

rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
Expand Down Expand Up @@ -2822,6 +2844,14 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit (rd_kafka_t *rk,
rd_kafka_assert(NULL, rk->rk_consumer.wait_commit_cnt > 0);
rk->rk_consumer.wait_commit_cnt--;

if (err == RD_KAFKA_RESP_ERR__DESTROY) {
rd_kafka_op_destroy(rko_orig);
return; /* Handle is terminating, this op may be handled
* by the op enq()ing thread rather than the
* rdkafka main thread, it is not safe to
* continue here. */
}

/* Update the committed offsets for each partition's rktp. */
errcnt = rd_kafka_cgrp_update_committed_offsets(rkcg, err, offsets);

Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -4526,6 +4526,8 @@ ut_create_msgs (rd_kafka_msgq_t *rkmq, uint64_t msgid, int cnt) {

rkm = ut_rd_kafka_msg_new(0);
rkm->rkm_u.producer.msgid = msgid++;
rkm->rkm_ts_enq = rd_clock();
rkm->rkm_ts_timeout = rkm->rkm_ts_enq + (900 * 1000 * 1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hard coded constant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, it's a unit test


rd_kafka_msgq_enq(rkmq, rkm);
}
Expand Down Expand Up @@ -4564,13 +4566,16 @@ static int unittest_idempotent_producer (void) {
int retry_msg_cnt = 0;
int drcnt = 0;
rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
const char *tmp;
int i, r;

RD_UT_SAY("Verifying idempotent producer error handling");

conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "batch.num.messages", "3", NULL, 0);
rd_kafka_conf_set(conf, "retry.backoff.ms", "1", NULL, 0);
if ((tmp = rd_getenv("TEST_DEBUG", NULL)))
rd_kafka_conf_set(conf, "debug", tmp, NULL, 0);
if (rd_kafka_conf_set(conf, "enable.idempotence", "true", NULL, 0) !=
RD_KAFKA_CONF_OK)
RD_UT_FAIL("Failed to enable idempotence");
Expand Down
Loading