Skip to content

Commit

Permalink
resp_cb: need to pass rk since rkb is not always passed
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Feb 11, 2016
1 parent 1d79e20 commit 0109c84
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 60 deletions.
6 changes: 4 additions & 2 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2204,7 +2204,8 @@ struct list_groups_state {
int grplist_size;
};

static void rd_kafka_DescribeGroups_resp_cb (rd_kafka_broker_t *rkb,
static void rd_kafka_DescribeGroups_resp_cb (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *reply,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -2310,7 +2311,8 @@ static void rd_kafka_DescribeGroups_resp_cb (rd_kafka_broker_t *rkb,
state->err = err;
}

static void rd_kafka_ListGroups_resp_cb (rd_kafka_broker_t *rkb,
static void rd_kafka_ListGroups_resp_cb (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *reply,
rd_kafka_buf_t *request,
Expand Down
21 changes: 12 additions & 9 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,9 @@ static int rd_kafka_broker_bufq_timeout_scan (rd_kafka_broker_t *rkb,
if (is_waitresp_q && rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING)
rd_atomic32_sub(&rkb->rkb_blocking_request_cnt, 1);

rd_kafka_buf_callback(rkb, RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
NULL, rkbuf);
rd_kafka_buf_callback(rkb->rkb_rk, rkb,
RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
NULL, rkbuf);
cnt++;
}

Expand Down Expand Up @@ -573,11 +574,11 @@ void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb,
*/
static int rd_kafka_broker_buf_enq2 (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf) {
if (unlikely(rkb->rkb_source == RD_KAFKA_INTERNAL /* FIXME||
rkb->rkb_state < RD_KAFKA_BROKER_STATE_UP */)) {
if (unlikely(rkb->rkb_source == RD_KAFKA_INTERNAL)) {
/* Fail request immediately if this is the internal broker. */
// FIXME there is no broker connection. */
rd_kafka_buf_callback(rkb, RD_KAFKA_RESP_ERR__TRANSPORT,
rd_kafka_buf_callback(rkb->rkb_rk, rkb,
RD_KAFKA_RESP_ERR__TRANSPORT,
NULL, rkbuf);
return -1;
}
Expand Down Expand Up @@ -909,7 +910,7 @@ static int rd_kafka_req_response (rd_kafka_broker_t *rkb,
(float)req->rkbuf_ts_sent / 1000.0f);

/* Call callback. */
rd_kafka_buf_callback(rkb, 0, rkbuf, req);
rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, rkbuf, req);

return 0;
}
Expand Down Expand Up @@ -1285,7 +1286,7 @@ int rd_kafka_send (rd_kafka_broker_t *rkb) {
if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_NO_RESPONSE))
rd_kafka_bufq_enq(&rkb->rkb_waitresps, rkbuf);
else { /* Call buffer callback for delivery report. */
rd_kafka_buf_callback(rkb, 0, NULL, rkbuf);
rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf);
}

cnt++;
Expand Down Expand Up @@ -1434,7 +1435,8 @@ rd_kafka_produce_reply_handle (rd_kafka_broker_t *rkb,
/**
* Locality: io thread
*/
static void rd_kafka_produce_msgset_reply (rd_kafka_broker_t *rkb,
static void rd_kafka_produce_msgset_reply (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *reply,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -3013,7 +3015,8 @@ rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,



static void rd_kafka_broker_fetch_reply (rd_kafka_broker_t *rkb,
static void rd_kafka_broker_fetch_reply (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *reply,
rd_kafka_buf_t *request,
Expand Down
12 changes: 8 additions & 4 deletions src/rdkafka_buf.c
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
rd_atomic32_get(&rkbufq->rkbq_cnt));

TAILQ_FOREACH_SAFE(rkbuf, &rkbufq->rkbq_bufs, rkbuf_link, tmp) {
rd_kafka_buf_callback(rkb, err, NULL, rkbuf);
rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
}
}

Expand Down Expand Up @@ -419,10 +419,13 @@ void rd_kafka_buf_handle_op (rd_kafka_op_t *rko) {
request->rkbuf_response = NULL;
rko->rko_rkbuf = NULL;

rd_kafka_buf_callback(request->rkbuf_rkb, rko->rko_err,
rd_kafka_buf_callback(request->rkbuf_rkb->rkb_rk,
request->rkbuf_rkb, rko->rko_err,
response, request);
}



/**
* Call request.rkbuf_cb(), but:
* - if the rkbuf has a rkbuf_replyq the buffer is enqueued on that queue
Expand All @@ -433,7 +436,8 @@ void rd_kafka_buf_handle_op (rd_kafka_op_t *rko) {
*
* Will decrease refcount for both response and request, eventually.
*/
void rd_kafka_buf_callback (rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
void rd_kafka_buf_callback (rd_kafka_t *rk,
rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
rd_kafka_buf_t *response, rd_kafka_buf_t *request) {

/* Decide if the request should be retried.
Expand Down Expand Up @@ -469,7 +473,7 @@ void rd_kafka_buf_callback (rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
}

if (request->rkbuf_cb)
request->rkbuf_cb(rkb, err, response, request,
request->rkbuf_cb(rk, rkb, err, response, request,
request->rkbuf_opaque);

rd_kafka_buf_destroy(request);
Expand Down
8 changes: 6 additions & 2 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,11 @@ typedef struct rd_kafka_broker_s rd_kafka_broker_t;
* toppar, queue, etc) and the callback may not be called in the
* correct thread. In this case the callback must perform just
* the most minimal cleanup and dont trigger any other operations.
*
* NOTE: rkb, reply and request may be NULL, depending on error situation.
*/
typedef void (rd_kafka_resp_cb_t) (rd_kafka_broker_t *rkb,
typedef void (rd_kafka_resp_cb_t) (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *reply,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -312,7 +315,8 @@ int rd_kafka_buf_retry (rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf);

void rd_kafka_buf_handle_op (rd_kafka_op_t *rko);
void rd_kafka_buf_callback (rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
void rd_kafka_buf_callback (rd_kafka_t *rk,
rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
rd_kafka_buf_t *response, rd_kafka_buf_t *request);


Expand Down
20 changes: 11 additions & 9 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ void rd_kafka_cgrp_coord_update (rd_kafka_cgrp_t *rkcg, int32_t coord_id) {
/**
* Handle GroupCoordinator response
*/
static void rd_kafka_cgrp_handle_GroupCoordinator (rd_kafka_broker_t *rkb,
static void rd_kafka_cgrp_handle_GroupCoordinator (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -454,7 +455,7 @@ static void rd_kafka_cgrp_leave (rd_kafka_cgrp_t *rkcg, int ignore_response) {
ignore_response ? NULL :
rd_kafka_handle_LeaveGroup, rkcg);
else if (!ignore_response)
rd_kafka_handle_LeaveGroup(rkcg->rkcg_rkb,
rd_kafka_handle_LeaveGroup(rkcg->rkcg_rk, rkcg->rkcg_rkb,
RD_KAFKA_RESP_ERR__WAIT_COORD,
NULL, NULL, rkcg);
}
Expand Down Expand Up @@ -636,9 +637,9 @@ rd_kafka_cgrp_offsets_fetch (rd_kafka_cgrp_t *rkcg, rd_kafka_broker_t *rkb,
use_offsets = rd_kafka_topic_partition_list_copy(offsets);

if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkb)
rd_kafka_cgrp_offsets_fetch_response(rkb,
RD_KAFKA_RESP_ERR__WAIT_COORD,
NULL, NULL, use_offsets);
rd_kafka_cgrp_offsets_fetch_response(
rkcg->rkcg_rk, rkb, RD_KAFKA_RESP_ERR__WAIT_COORD,
NULL, NULL, use_offsets);
else
rd_kafka_OffsetFetchRequest(
rkb, 1, offsets,
Expand Down Expand Up @@ -794,7 +795,7 @@ static void rd_kafka_cgrp_offsets_commit (rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t
*offsets) {
if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkb)
rd_kafka_op_handle_OffsetCommit(rkb,
rd_kafka_op_handle_OffsetCommit(rkcg->rkcg_rk, rkb,
RD_KAFKA_RESP_ERR__WAIT_COORD,
NULL, NULL,
rko);
Expand All @@ -804,7 +805,7 @@ static void rd_kafka_cgrp_offsets_commit (rd_kafka_cgrp_t *rkcg,
&rkcg->rkcg_ops,
rd_kafka_op_handle_OffsetCommit, rko) == 0)
/* No valid offsets */
rd_kafka_op_handle_OffsetCommit(rkb,
rd_kafka_op_handle_OffsetCommit(rkcg->rkcg_rk, rkb,
RD_KAFKA_RESP_ERR__NO_OFFSET,
NULL, NULL,
rko);
Expand Down Expand Up @@ -1327,7 +1328,8 @@ static void rd_kafka_cgrp_op_serve (rd_kafka_cgrp_t *rkcg,
if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)) {
rd_kafka_op_handle_OffsetFetch(
rkb, RD_KAFKA_RESP_ERR__WAIT_COORD,
rkcg->rkcg_rk, rkb,
RD_KAFKA_RESP_ERR__WAIT_COORD,
NULL, NULL, rko);
rko = NULL; /* rko freed by handler */
break;
Expand Down Expand Up @@ -1410,7 +1412,7 @@ static void rd_kafka_cgrp_op_serve (rd_kafka_cgrp_t *rkcg,
rkcg->rkcg_group_id->str);

rd_kafka_op_handle_OffsetCommit(
rkcg->rkcg_rkb,
rkcg->rkcg_rk, rkcg->rkcg_rkb,
RD_KAFKA_RESP_ERR__NO_OFFSET,
NULL, NULL, rko);
rko = NULL; /* freed by op_handle */
Expand Down
43 changes: 27 additions & 16 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ static int rd_kafka_err_action (rd_kafka_broker_t *rkb,
}


static void rd_kafka_assignor_handle_Metadata (rd_kafka_broker_t *rkb,
static void rd_kafka_assignor_handle_Metadata (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -115,12 +116,13 @@ void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb,
* Returns the parsed Offset in '*Offsetp'.
* Returns 0 on success, else an error.
*/
static rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
rd_kafka_toppar_t *rktp,
int64_t *Offsetp) {
rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
rd_kafka_toppar_t *rktp,
int64_t *Offsetp) {
const int log_decode_errors = 1;
int16_t ErrorCode = 0;
int32_t TopicArrayCnt;
Expand Down Expand Up @@ -368,7 +370,8 @@ void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb,
* 'offsets' list.
*/
rd_kafka_resp_err_t
rd_kafka_handle_OffsetFetch (rd_kafka_broker_t *rkb,
rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -485,7 +488,8 @@ rd_kafka_handle_OffsetFetch (rd_kafka_broker_t *rkb,
*
* Locality: cgrp's broker thread
*/
void rd_kafka_op_handle_OffsetFetch (rd_kafka_broker_t *rkb,
void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -602,7 +606,8 @@ void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
* Handle OffsetCommitResponse
* Takes the original 'rko' as opaque argument.
*/
void rd_kafka_op_handle_OffsetCommit (rd_kafka_broker_t *rkb,
void rd_kafka_op_handle_OffsetCommit (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -898,7 +903,8 @@ void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb,
* Handler for SyncGroup responses
* opaque must be the cgrp handle.
*/
void rd_kafka_handle_SyncGroup (rd_kafka_broker_t *rkb,
void rd_kafka_handle_SyncGroup (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -1073,7 +1079,8 @@ rd_kafka_group_MemberMetadata_consumer_read (
*
* Locality: cgrp broker thread
*/
void rd_kafka_cgrp_handle_JoinGroup (rd_kafka_broker_t *rkb,
void rd_kafka_cgrp_handle_JoinGroup (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -1250,7 +1257,8 @@ void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb,
* Handler for LeaveGroup responses
* opaque must be the cgrp handle.
*/
void rd_kafka_handle_LeaveGroup (rd_kafka_broker_t *rkb,
void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -1330,7 +1338,8 @@ void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb,
* Generic handler for Heartbeat responses.
* opaque must be the cgrp handle.
*/
void rd_kafka_cgrp_handle_Heartbeat (rd_kafka_broker_t *rkb,
void rd_kafka_cgrp_handle_Heartbeat (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -1694,7 +1703,8 @@ rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb,
*
* Locality: rdkafka main thread
*/
void rd_kafka_op_handle_Metadata (rd_kafka_broker_t *rkb,
void rd_kafka_op_handle_Metadata (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
Expand Down Expand Up @@ -1751,7 +1761,8 @@ void rd_kafka_op_handle_Metadata (rd_kafka_broker_t *rkb,
}
}

static void rd_kafka_assignor_handle_Metadata (rd_kafka_broker_t *rkb,
static void rd_kafka_assignor_handle_Metadata (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
Expand Down
Loading

0 comments on commit 0109c84

Please sign in to comment.