Skip to content

Commit

Permalink
Execute GSSAPI kinit from background thread and wait for it to finish…
Browse files Browse the repository at this point in the history
… before connecting

Fixes confluentinc/confluent-kafka-python#1023
  • Loading branch information
edenhill committed Apr 8, 2021
1 parent beebfa1 commit 3b137ba
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 17 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ librdkafka v1.7.0 is feature release:
`SSLv23_client_method()`.
* Speed up triggering of new broker connections in certain cases by exiting
the broker thread io/op poll loop when a wakeup op is received.
* SASL GSSAPI: The Kerberos kinit refresh command was triggered from
`rd_kafka_new()` which made this call blocking if the refresh command
was taking long. The refresh is now performed by the background rdkafka
main thread.

### Consumer fixes

Expand Down
42 changes: 38 additions & 4 deletions src/rdkafka_sasl_cyrus.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ static mtx_t rd_kafka_sasl_cyrus_kinit_lock;
*/
typedef struct rd_kafka_sasl_cyrus_handle_s {
rd_kafka_timer_t kinit_refresh_tmr;
rd_atomic32_t ready; /**< First kinit command has finished, or there
* is no kinit command. */
} rd_kafka_sasl_cyrus_handle_t;

/**
Expand Down Expand Up @@ -197,10 +199,12 @@ static ssize_t render_callback (const char *key, char *buf,
* @locality rdkafka main thread
*/
static int rd_kafka_sasl_cyrus_kinit_refresh (rd_kafka_t *rk) {
rd_kafka_sasl_cyrus_handle_t *handle = rk->rk_sasl.handle;
int r;
char *cmd;
char errstr[128];
rd_ts_t ts_start;
int duration;

/* Build kinit refresh command line using string rendering and config */
cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd,
Expand All @@ -226,6 +230,21 @@ static int rd_kafka_sasl_cyrus_kinit_refresh (rd_kafka_t *rk) {
r = system(cmd);
mtx_unlock(&rd_kafka_sasl_cyrus_kinit_lock);

duration = (int)((rd_clock() - ts_start) / 1000);
if (duration > 5000)
rd_kafka_log(rk, LOG_WARNING, "SASLREFRESH",
"Slow Kerberos ticket refresh: %dms: %s",
duration, cmd);

/* Regardless of outcome from the kinit command (it can fail
* even if the ticket is available), we now allow broker connections. */
if (rd_atomic32_add(&handle->ready, 1) == 1) {
rd_kafka_dbg(rk, SECURITY, "SASLREFRESH",
"First kinit command finished: waking up "
"broker threads");
rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT);
}

if (r == -1) {
if (errno == ECHILD) {
rd_kafka_log(rk, LOG_WARNING, "SASLREFRESH",
Expand Down Expand Up @@ -259,8 +278,7 @@ static int rd_kafka_sasl_cyrus_kinit_refresh (rd_kafka_t *rk) {
rd_free(cmd);

rd_kafka_dbg(rk, SECURITY, "SASLREFRESH",
"Kerberos ticket refreshed in %"PRId64"ms",
(rd_clock() - ts_start) / 1000);
"Kerberos ticket refreshed in %dms", duration);
return 0;
}

Expand Down Expand Up @@ -547,6 +565,19 @@ static int rd_kafka_sasl_cyrus_client_new (rd_kafka_transport_t *rktrans,
}


/**
* @brief SASL/GSSAPI is ready when at least one kinit command has been
* executed (regardless of exit status).
*/
static rd_bool_t rd_kafka_sasl_cyrus_ready (rd_kafka_t *rk) {
rd_kafka_sasl_cyrus_handle_t *handle = rk->rk_sasl.handle;

if (!handle)
return rd_false;

return rd_atomic32_get(&handle->ready) > 0;
}

/**
* @brief Per-client-instance initializer
*/
Expand All @@ -566,8 +597,10 @@ static int rd_kafka_sasl_cyrus_init (rd_kafka_t *rk,
rk->rk_conf.sasl.relogin_min_time * 1000ll,
rd_kafka_sasl_cyrus_kinit_refresh_tmr_cb, rk);

/* Acquire or refresh ticket */
rd_kafka_sasl_cyrus_kinit_refresh(rk);
/* Kick off the timer immediately to refresh the ticket.
* (Timer is triggered from the main loop). */
rd_kafka_timer_override_once(&rk->rk_timers, &handle->kinit_refresh_tmr,
0/*immediately*/);

return 0;
}
Expand Down Expand Up @@ -653,5 +686,6 @@ const struct rd_kafka_sasl_provider rd_kafka_sasl_cyrus_provider = {
.client_new = rd_kafka_sasl_cyrus_client_new,
.recv = rd_kafka_sasl_cyrus_recv,
.close = rd_kafka_sasl_cyrus_close,
.ready = rd_kafka_sasl_cyrus_ready,
.conf_validate = rd_kafka_sasl_cyrus_conf_validate
};
65 changes: 52 additions & 13 deletions src/rdkafka_timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,42 @@ static void rd_kafka_timer_unschedule (rd_kafka_timers_t *rkts,
rtmr->rtmr_next = 0;
}


/**
* @brief Schedule the next firing of the timer at \p abs_time.
*
* @remark Will not update rtmr_interval, only rtmr_next.
*
* @locks_required timers_lock()
*/
static void rd_kafka_timer_schedule_next (rd_kafka_timers_t *rkts,
rd_kafka_timer_t *rtmr,
rd_ts_t abs_time) {
rd_kafka_timer_t *first;

rtmr->rtmr_next = abs_time;

if (!(first = TAILQ_FIRST(&rkts->rkts_timers)) ||
first->rtmr_next > rtmr->rtmr_next) {
TAILQ_INSERT_HEAD(&rkts->rkts_timers, rtmr, rtmr_link);
cnd_signal(&rkts->rkts_cond);
if (rkts->rkts_wakeq)
rd_kafka_q_yield(rkts->rkts_wakeq, rd_true);
} else
TAILQ_INSERT_SORTED(&rkts->rkts_timers, rtmr,
rd_kafka_timer_t *, rtmr_link,
rd_kafka_timer_cmp);
}


/**
* @brief Schedule the next firing of the timer according to the timer's
* interval plus an optional \p extra_us.
*
* @locks_required timers_lock()
*/
static void rd_kafka_timer_schedule (rd_kafka_timers_t *rkts,
rd_kafka_timer_t *rtmr, int extra_us) {
rd_kafka_timer_t *first;

/* Timer has been stopped */
if (!rtmr->rtmr_interval)
Expand All @@ -75,18 +108,8 @@ static void rd_kafka_timer_schedule (rd_kafka_timers_t *rkts,
if (unlikely(!rkts->rkts_enabled))
return;

rtmr->rtmr_next = rd_clock() + rtmr->rtmr_interval + extra_us;

if (!(first = TAILQ_FIRST(&rkts->rkts_timers)) ||
first->rtmr_next > rtmr->rtmr_next) {
TAILQ_INSERT_HEAD(&rkts->rkts_timers, rtmr, rtmr_link);
cnd_signal(&rkts->rkts_cond);
if (rkts->rkts_wakeq)
rd_kafka_q_yield(rkts->rkts_wakeq, rd_true);
} else
TAILQ_INSERT_SORTED(&rkts->rkts_timers, rtmr,
rd_kafka_timer_t *, rtmr_link,
rd_kafka_timer_cmp);
rd_kafka_timer_schedule_next(
rkts, rtmr, rd_clock() + rtmr->rtmr_interval + extra_us);
}

/**
Expand Down Expand Up @@ -181,6 +204,22 @@ void rd_kafka_timer_exp_backoff (rd_kafka_timers_t *rkts,
rd_kafka_timers_unlock(rkts);
}

/**
* @brief Override the interval once for the next firing of the timer.
*
* @locks_required none
* @locks_acquired timers_lock
*/
void rd_kafka_timer_override_once (rd_kafka_timers_t *rkts,
rd_kafka_timer_t *rtmr,
rd_ts_t interval) {
rd_kafka_timers_lock(rkts);
if (rd_kafka_timer_scheduled(rtmr))
rd_kafka_timer_unschedule(rkts, rtmr);
rd_kafka_timer_schedule_next(rkts, rtmr, rd_clock() + interval);
rd_kafka_timers_unlock(rkts);
}


/**
* @returns the delta time to the next time (>=0) this timer fires, or -1
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ void rd_kafka_timer_exp_backoff (rd_kafka_timers_t *rkts,
rd_ts_t rd_kafka_timer_next (rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr,
int do_lock);

void rd_kafka_timer_override_once (rd_kafka_timers_t *rkts,
rd_kafka_timer_t *rtmr,
rd_ts_t interval);

/**
* @returns true if timer is started.
*
Expand Down

0 comments on commit 3b137ba

Please sign in to comment.