diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 5711820b10..7bb4992f42 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -51,6 +51,34 @@ static __inline void rd_kafka_broker_fetch_toppar_del (rd_kafka_broker_t *rkb, +/** + * Toppar based OffsetResponse handling. + * This is used for updating the low water mark for consumer lag. + */ +static void rd_kafka_toppar_lag_handle_Offset (rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + shptr_rd_kafka_toppar_t *s_rktp = opaque; + rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); + int64_t Offset; + + /* Parse and return Offset */ + err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err, + rkbuf, request, rktp, &Offset); + + if (!err) + rktp->rktp_lo_offset = Offset; + + rktp->rktp_wait_consumer_lag_resp = 0; + + rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */ +} + + + /** * Request information from broker to keep track of consumer lag. * @@ -858,6 +886,88 @@ void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp, } + + +/** + * Toppar based OffsetResponse handling. + * This is used for finding the next offset to Fetch. + */ +static void rd_kafka_toppar_handle_Offset (rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + shptr_rd_kafka_toppar_t *s_rktp = opaque; + rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); + int64_t Offset; + + /* Parse and return Offset */ + err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err, + rkbuf, request, rktp, &Offset); + + if (err) { + rd_kafka_op_t *rko; + + rd_rkb_dbg(rkb, TOPIC, "OFFSET", + "Offset reply error for " + "topic %.*s [%"PRId32"]: %s", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, rd_kafka_err2str(err)); + + if (err == RD_KAFKA_RESP_ERR__DESTROY) { + /* Termination, quick cleanup. */ + + /* from request.opaque */ + rd_kafka_toppar_destroy(s_rktp); + + return; + } else if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) + return; /* Retry in progress */ + + + rd_kafka_toppar_lock(rktp); + rd_kafka_offset_reset(rktp, rktp->rktp_query_offset, + err, + "failed to query logical offset"); + + /* Signal error back to application, + * unless this is an intermittent problem + * (e.g.,connection lost) */ + rko = rd_kafka_op_new(RD_KAFKA_OP_CONSUMER_ERR); + rko->rko_err = err; + if (rktp->rktp_query_offset <= + RD_KAFKA_OFFSET_TAIL_BASE) + rko->rko_rkmessage.offset = + rktp->rktp_query_offset - + RD_KAFKA_OFFSET_TAIL_BASE; + else + rko->rko_rkmessage.offset = + rktp->rktp_query_offset; + rd_kafka_toppar_unlock(rktp); + rko->rko_rktp = rd_kafka_toppar_keep(rktp); + rko->rko_rkmessage.partition = rktp->rktp_partition; + + rd_kafka_q_enq(&rktp->rktp_fetchq, rko); + + rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */ + return; + } + + rd_kafka_toppar_lock(rktp); + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET", + "Offset %"PRId64" request for %.*s [%"PRId32"] " + "returned offset %s (%"PRId64")", + rktp->rktp_query_offset, + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, rd_kafka_offset2str(Offset), Offset); + + rd_kafka_toppar_next_offset_handle(rktp, Offset); + rd_kafka_toppar_unlock(rktp); + + rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */ +} + /** * Send OffsetRequest for toppar. * diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 5f521259f5..e48f933abb 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -41,29 +41,46 @@ * other threads. */ -#define RD_KAFKA_ERR_ACTION_PERMANENT 0x1 /* Permanent error */ -#define RD_KAFKA_ERR_ACTION_IGNORE 0x2 /* Error can be ignored */ -#define RD_KAFKA_ERR_ACTION_REFRESH 0x4 /* Refresh state (e.g., metadata) */ -#define RD_KAFKA_ERR_ACTION_INFORM 0x8 /* Inform application about err */ +#define RD_KAFKA_ERR_ACTION_PERMANENT 0x1 /* Permanent error */ +#define RD_KAFKA_ERR_ACTION_IGNORE 0x2 /* Error can be ignored */ +#define RD_KAFKA_ERR_ACTION_REFRESH 0x4 /* Refresh state (e.g., metadata) */ +#define RD_KAFKA_ERR_ACTION_RETRY 0x8 /* Retry request after backoff */ +#define RD_KAFKA_ERR_ACTION_INFORM 0x10 /* Inform application about err */ +#define RD_KAFKA_ERR_ACTION_END 0 /* var-arg sentinel */ static int rd_kafka_err_action (rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request) { + rd_kafka_buf_t *request, ...) { + va_list ap; int actions = 0; + int exp_act; - if (err && rkb && request) + /* Match explicitly defined error mappings first. */ + va_start(ap, request); + while ((exp_act = va_arg(ap, int))) { + int exp_err = va_arg(ap, int); + + if (err == exp_err) + actions |= exp_act; + } + va_end(ap); + + if (err && rkb && request) rd_rkb_dbg(rkb, BROKER, "REQERR", - "%sRequest failed: %s", + "%sRequest failed: %s: explicit actions 0x%x", rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey), - rd_kafka_err2str(err)); + rd_kafka_err2str(err), actions); + + /* Explicit error match. */ + if (actions) + return actions; - // FIXME: ILLEGAL_GENERATION + /* Default error matching */ switch (err) { case RD_KAFKA_RESP_ERR_NO_ERROR: break; - case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART: case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION: case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE: @@ -74,6 +91,10 @@ static int rd_kafka_err_action (rd_kafka_broker_t *rkb, /* Request metadata information update */ actions |= RD_KAFKA_ERR_ACTION_REFRESH; break; + case RD_KAFKA_RESP_ERR__TRANSPORT: + /* Broker connection down */ + actions |= RD_KAFKA_ERR_ACTION_RETRY; + break; case RD_KAFKA_RESP_ERR__DESTROY: default: actions |= RD_KAFKA_ERR_ACTION_PERMANENT; @@ -169,20 +190,30 @@ rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk, } err: - actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request); + actions = rd_kafka_err_action( + rkb, ErrorCode, rkbuf, request, + RD_KAFKA_ERR_ACTION_PERMANENT, + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, + + RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, + + RD_KAFKA_ERR_ACTION_END); + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { /* Re-query for leader */ rd_kafka_topic_leader_query(rktp->rktp_rkt->rkt_rk, rktp->rktp_rkt); + } + if (actions & RD_KAFKA_ERR_ACTION_RETRY) { /* * Schedule a retry */ - rd_kafka_buf_keep(request); - /* Acquire new rktp refcount for the retry. */ - request->rkbuf_opaque = rd_kafka_toppar_keep(rktp); - rd_kafka_broker_buf_retry(request->rkbuf_rkb, request); - return RD_KAFKA_RESP_ERR__IN_PROGRESS; - } + if (rd_kafka_buf_retry(rkb, 0/*FIXME*/, request)) + return RD_KAFKA_RESP_ERR__IN_PROGRESS; + + /* FALLTHRU */ + } done: if (!ErrorCode && !hit) @@ -195,125 +226,8 @@ rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk, } -/** - * Toppar based OffsetResponse handling. - * This is used for finding the next offset to Fetch. - */ -void rd_kafka_toppar_handle_Offset (rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - void *opaque) { - shptr_rd_kafka_toppar_t *s_rktp = opaque; - rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); - int64_t Offset; - - /* Parse and return Offset */ - err = rd_kafka_handle_Offset(rkb, err, rkbuf, request, rktp, &Offset); - - if (err) { - rd_kafka_op_t *rko; - - rd_rkb_dbg(rkb, TOPIC, "OFFSET", - "Offset reply error for " - "topic %.*s [%"PRId32"]: %s", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, rd_kafka_err2str(err)); - - if (err == RD_KAFKA_RESP_ERR__DESTROY) { - /* Termination, quick cleanup. */ - - /* from request.opaque */ - rd_kafka_toppar_destroy(s_rktp); - - return; - - } else if (err == RD_KAFKA_RESP_ERR__TRANSPORT) { - /* Intermittent connectivity problem, - * return to query state and let the timed - * interval try again. */ - rd_kafka_toppar_lock(rktp); - rd_kafka_toppar_offset_request(rktp, - rktp->rktp_query_offset, - 500); - rd_kafka_toppar_unlock(rktp); - - /* from request.opaque */ - rd_kafka_toppar_destroy(s_rktp); - - return; - } - - - rd_kafka_toppar_lock(rktp); - rd_kafka_offset_reset(rktp, rktp->rktp_query_offset, - err, - "failed to query logical offset"); - - /* Signal error back to application, - * unless this is an intermittent problem - * (e.g.,connection lost) */ - rko = rd_kafka_op_new(RD_KAFKA_OP_CONSUMER_ERR); - rko->rko_err = err; - if (rktp->rktp_query_offset <= - RD_KAFKA_OFFSET_TAIL_BASE) - rko->rko_rkmessage.offset = - rktp->rktp_query_offset - - RD_KAFKA_OFFSET_TAIL_BASE; - else - rko->rko_rkmessage.offset = - rktp->rktp_query_offset; - rd_kafka_toppar_unlock(rktp); - rko->rko_rktp = rd_kafka_toppar_keep(rktp); - rko->rko_rkmessage.partition = rktp->rktp_partition; - - rd_kafka_q_enq(&rktp->rktp_fetchq, rko); - - rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */ - return; - } - - rd_kafka_toppar_lock(rktp); - rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET", - "Offset %"PRId64" request for %.*s [%"PRId32"] " - "returned offset %s (%"PRId64")", - rktp->rktp_query_offset, - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, rd_kafka_offset2str(Offset), Offset); - - rd_kafka_toppar_next_offset_handle(rktp, Offset); - rd_kafka_toppar_unlock(rktp); - - rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */ -} - - -/** - * Toppar based OffsetResponse handling. - * This is used for updating the low water mark for consumer lag. - */ -void rd_kafka_toppar_lag_handle_Offset (rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - void *opaque) { - shptr_rd_kafka_toppar_t *s_rktp = opaque; - rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); - int64_t Offset; - - /* Parse and return Offset */ - err = rd_kafka_handle_Offset(rkb, err, rkbuf, request, rktp, &Offset); - - if (!err) - rktp->rktp_lo_offset = Offset; - - rktp->rktp_wait_consumer_lag_resp = 0; - - rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */ -} - /** @@ -461,7 +375,8 @@ rd_kafka_handle_OffsetFetch (rd_kafka_t *rk, seen_cnt, offsets ? offsets->cnt : -1, rd_kafka_err2str(err)); - actions = rd_kafka_err_action(rkb, err, rkbuf, request); + actions = rd_kafka_err_action(rkb, err, rkbuf, request, + RD_KAFKA_ERR_ACTION_END); if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { /* Re-query for coordinator */ @@ -662,7 +577,8 @@ void rd_kafka_op_handle_OffsetCommit (rd_kafka_t *rk, err: - rd_kafka_err_action(rkb, ErrorCode, rkbuf, request); + rd_kafka_err_action(rkb, ErrorCode, rkbuf, request, + RD_KAFKA_ERR_ACTION_END); if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY && (replyq = rko_orig->rko_replyq)) { @@ -924,7 +840,8 @@ void rd_kafka_handle_SyncGroup (rd_kafka_t *rk, rd_kafka_buf_read_bytes(rkbuf, &MemberState); err: - actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request); + actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request, + RD_KAFKA_ERR_ACTION_END); if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { /* Re-query for coordinator */ @@ -1206,7 +1123,8 @@ void rd_kafka_cgrp_handle_JoinGroup (rd_kafka_t *rk, } err: - actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request); + actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request, + RD_KAFKA_ERR_ACTION_END); if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { /* Re-query for coordinator */ @@ -1277,7 +1195,8 @@ void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk, err: - actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request); + actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request, + RD_KAFKA_ERR_ACTION_END); if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { /* Re-query for coordinator */ @@ -1357,7 +1276,8 @@ void rd_kafka_cgrp_handle_Heartbeat (rd_kafka_t *rk, rd_kafka_buf_read_i16(rkbuf, &ErrorCode); err: - actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request); + actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request, + RD_KAFKA_ERR_ACTION_END); if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { /* Re-query for coordinator */