Skip to content

Commit

Permalink
RD_KAFKA_RESP_ERR_INVALID_MSG make it retriable
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Mar 20, 2024
1 parent c3d37db commit 1189944
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ int rd_kafka_err_action(rd_kafka_broker_t *rkb,
case RD_KAFKA_RESP_ERR__TIMED_OUT:
case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND:
case RD_KAFKA_RESP_ERR_INVALID_MSG:
actions |= RD_KAFKA_ERR_ACTION_RETRY |
RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED;
break;
Expand Down
38 changes: 38 additions & 0 deletions tests/0143-exponential_backoff_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,42 @@ static void test_fetch_fast_leader_query(rd_kafka_mock_cluster_t *mcluster,
SUB_TEST_PASS();
}

/*
* Test if the producer retries the produce request after receiving RD_KAFKA_RESP_ERR_INVALID_MSG
* from the broker.
*/
static void test_produce_retry_invalid_msg(rd_kafka_mock_cluster_t *mcluster,
const char *topic,
rd_kafka_conf_t *conf) {
rd_kafka_t *producer;
rd_kafka_topic_t *rkt;
SUB_TEST();
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);

producer = test_create_handle(RD_KAFKA_PRODUCER, conf);
rkt = test_create_producer_topic(producer, topic, NULL);

rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_Produce, 1, RD_KAFKA_RESP_ERR_INVALID_MSG);
test_produce_msgs(producer, rkt, 0, RD_KAFKA_PARTITION_UA, 0, 1,
"hello", 6);
rd_kafka_mock_request_t **requests = NULL;
size_t request_cnt = 0;
int64_t previous_request_ts = -1;
int32_t retry_count = 0;
size_t i;
requests = rd_kafka_mock_get_requests(mcluster, &request_cnt);
/* 2 API_VERSIONS, 1 METADATA, 2 PRODUCE */
TEST_ASSERT(request_cnt == 5, "Expected 5 requests, got %zu\n", request_cnt);
TEST_ASSERT(rd_kafka_mock_request_api_key(requests[3]) == RD_KAFKAP_Produce, "Expected Produce, got %d\n", rd_kafka_mock_request_api_key(requests[3]));
TEST_ASSERT(rd_kafka_mock_request_api_key(requests[4]) == RD_KAFKAP_Produce, "Expected Produce, got %d\n", rd_kafka_mock_request_api_key(requests[4]));

free_mock_requests(requests, request_cnt);
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(producer);
rd_kafka_mock_clear_requests(mcluster);
SUB_TEST_PASS();
}

/**
* @brief Exponential Backoff (KIP 580)
* We test all the pipelines which affect the retry mechanism for both
Expand Down Expand Up @@ -555,6 +591,8 @@ int main_0143_exponential_backoff_mock(int argc, char **argv) {
test_fetch_fast_leader_query(mcluster, topic, rd_kafka_conf_dup(conf));
test_produce_fast_leader_query(mcluster, topic,
rd_kafka_conf_dup(conf));
test_produce_retry_invalid_msg(mcluster, topic, rd_kafka_conf_dup(conf));
rd_kafka_mock_stop_request_tracking(mcluster);
test_mock_cluster_destroy(mcluster);
rd_kafka_conf_destroy(conf);
return 0;
Expand Down

0 comments on commit 1189944

Please sign in to comment.