Skip to content

Commit

Permalink
Improved Offset request error handling (exposed by #529)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Feb 11, 2016
1 parent 0109c84 commit 6d251cb
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 140 deletions.
110 changes: 110 additions & 0 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,34 @@ static __inline void rd_kafka_broker_fetch_toppar_del (rd_kafka_broker_t *rkb,



/**
* Toppar based OffsetResponse handling.
* This is used for updating the low water mark for consumer lag.
*/
static void rd_kafka_toppar_lag_handle_Offset (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
void *opaque) {
shptr_rd_kafka_toppar_t *s_rktp = opaque;
rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
int64_t Offset;

/* Parse and return Offset */
err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err,
rkbuf, request, rktp, &Offset);

if (!err)
rktp->rktp_lo_offset = Offset;

rktp->rktp_wait_consumer_lag_resp = 0;

rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */
}



/**
* Request information from broker to keep track of consumer lag.
*
Expand Down Expand Up @@ -858,6 +886,88 @@ void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
}




/**
* Toppar based OffsetResponse handling.
* This is used for finding the next offset to Fetch.
*/
static void rd_kafka_toppar_handle_Offset (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
void *opaque) {
shptr_rd_kafka_toppar_t *s_rktp = opaque;
rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
int64_t Offset;

/* Parse and return Offset */
err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err,
rkbuf, request, rktp, &Offset);

if (err) {
rd_kafka_op_t *rko;

rd_rkb_dbg(rkb, TOPIC, "OFFSET",
"Offset reply error for "
"topic %.*s [%"PRId32"]: %s",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, rd_kafka_err2str(err));

if (err == RD_KAFKA_RESP_ERR__DESTROY) {
/* Termination, quick cleanup. */

/* from request.opaque */
rd_kafka_toppar_destroy(s_rktp);

return;
} else if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
return; /* Retry in progress */


rd_kafka_toppar_lock(rktp);
rd_kafka_offset_reset(rktp, rktp->rktp_query_offset,
err,
"failed to query logical offset");

/* Signal error back to application,
* unless this is an intermittent problem
* (e.g.,connection lost) */
rko = rd_kafka_op_new(RD_KAFKA_OP_CONSUMER_ERR);
rko->rko_err = err;
if (rktp->rktp_query_offset <=
RD_KAFKA_OFFSET_TAIL_BASE)
rko->rko_rkmessage.offset =
rktp->rktp_query_offset -
RD_KAFKA_OFFSET_TAIL_BASE;
else
rko->rko_rkmessage.offset =
rktp->rktp_query_offset;
rd_kafka_toppar_unlock(rktp);
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rko->rko_rkmessage.partition = rktp->rktp_partition;

rd_kafka_q_enq(&rktp->rktp_fetchq, rko);

rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */
return;
}

rd_kafka_toppar_lock(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"Offset %"PRId64" request for %.*s [%"PRId32"] "
"returned offset %s (%"PRId64")",
rktp->rktp_query_offset,
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, rd_kafka_offset2str(Offset), Offset);

rd_kafka_toppar_next_offset_handle(rktp, Offset);
rd_kafka_toppar_unlock(rktp);

rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */
}

/**
* Send OffsetRequest for toppar.
*
Expand Down
Loading

0 comments on commit 6d251cb

Please sign in to comment.