Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to metadata refresh interruption #4679

Merged
merged 4 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ librdkafka v2.4.0 is a feature release:
* Fix to metadata cache expiration on full metadata refresh (#4677).
* Fix for a wrong error returned on full metadata refresh before joining
a consumer group (#4678).
* Fix to metadata refresh interruption (#4679).


## Upgrade considerations
Expand Down Expand Up @@ -58,6 +59,10 @@ librdkafka v2.4.0 is a feature release:
could lead to an `UNKNOWN_TOPIC_OR_PART` error. Solved by updating
the consumer group following a metadata refresh only in safe states.
Happening since 2.1.0 (#4678).
* Issues: #4577.
Metadata refreshes without partition leader change could lead to a loop of
metadata calls at fixed intervals. Solved by stopping metadata refresh when
all existing metadata is non-stale. Happening since 2.3.0 (#4679).

### Consumer fixes

Expand Down
12 changes: 10 additions & 2 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2673,8 +2673,16 @@ rd_kafka_mock_request_copy(rd_kafka_mock_request_t *mrequest) {
return request;
}

void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *element) {
rd_free(element);
void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *mrequest) {
rd_free(mrequest);
}

void rd_kafka_mock_request_destroy_array(rd_kafka_mock_request_t **mrequests,
size_t mrequest_cnt) {
size_t i;
for (i = 0; i < mrequest_cnt; i++)
rd_kafka_mock_request_destroy(mrequests[i]);
rd_free(mrequests);
}

static void rd_kafka_mock_request_free(void *element) {
Expand Down
8 changes: 8 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2019-2022, Magnus Edenhill
* 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -388,6 +389,13 @@ typedef struct rd_kafka_mock_request_s rd_kafka_mock_request_t;
*/
RD_EXPORT void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *mreq);

/**
* @brief Destroy a rd_kafka_mock_request_t * array and deallocate it.
*/
RD_EXPORT void
rd_kafka_mock_request_destroy_array(rd_kafka_mock_request_t **mreqs,
size_t mreq_cnt);

/**
* @brief Get the broker id to which \p mreq was sent.
*/
Expand Down
23 changes: 16 additions & 7 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1277,8 +1277,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
rd_kafka_broker_t **partbrokers;
int leader_cnt = 0;
int old_state;
rd_bool_t partition_exists_with_no_leader_epoch = rd_false;
rd_bool_t partition_exists_with_updated_leader_epoch = rd_false;
rd_bool_t partition_exists_with_no_leader_epoch = rd_false;
rd_bool_t partition_exists_with_stale_leader_epoch = rd_false;

if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR)
rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA",
Expand Down Expand Up @@ -1328,8 +1328,17 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
upd += rd_kafka_topic_partition_cnt_update(rkt,
mdt->partition_cnt);
if (rd_kafka_Uuid_cmp(mdit->topic_id, RD_KAFKA_UUID_ZERO))
if (rd_kafka_Uuid_cmp(mdit->topic_id, RD_KAFKA_UUID_ZERO)) {
/* FIXME: an offset reset must be triggered.
* when rkt_topic_id wasn't zero.
* There are no problems
* in test 0107_topic_recreate if offsets in new
* topic are lower than in previous one,
* causing an out of range and an offset reset,
* but the rarer case where they're higher needs
* to be checked. */
rkt->rkt_topic_id = mdit->topic_id;
}
/* If the metadata times out for a topic (because all brokers
* are down) the state will transition to S_UNKNOWN.
* When updated metadata is eventually received there might
Expand All @@ -1343,7 +1352,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,

/* Update leader for each partition */
for (j = 0; j < mdt->partition_cnt; j++) {
int r;
int r = 0;
rd_kafka_broker_t *leader;
int32_t leader_epoch = mdit->partitions[j].leader_epoch;
rd_kafka_toppar_t *rktp =
Expand All @@ -1362,8 +1371,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
* set to -1, we assume that metadata is not stale. */
if (leader_epoch == -1)
partition_exists_with_no_leader_epoch = rd_true;
else if (rktp->rktp_leader_epoch < leader_epoch)
partition_exists_with_updated_leader_epoch = rd_true;
else if (leader_epoch < rktp->rktp_leader_epoch)
partition_exists_with_stale_leader_epoch = rd_true;


/* Update leader for partition */
Expand All @@ -1386,7 +1395,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
* stale, we can turn off fast leader query. */
if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt &&
(partition_exists_with_no_leader_epoch ||
partition_exists_with_updated_leader_epoch))
!partition_exists_with_stale_leader_epoch))
rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;

if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) {
Expand Down
17 changes: 5 additions & 12 deletions tests/0143-exponential_backoff_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@
const int32_t retry_ms = 100;
const int32_t retry_max_ms = 1000;

static void free_mock_requests(rd_kafka_mock_request_t **requests,
size_t request_cnt) {
size_t i;
for (i = 0; i < request_cnt; i++)
rd_kafka_mock_request_destroy(requests[i]);
rd_free(requests);
}
/**
* @brief find_coordinator test
* We fail the request with RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
Expand Down Expand Up @@ -112,7 +105,7 @@ static void test_find_coordinator(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_mock_request_timestamp(requests[i]);
}
rd_kafka_destroy(consumer);
free_mock_requests(requests, request_cnt);
rd_kafka_mock_request_destroy_array(requests, request_cnt);
rd_kafka_mock_clear_requests(mcluster);
SUB_TEST_PASS();
}
Expand Down Expand Up @@ -166,7 +159,7 @@ static void helper_exponential_backoff(rd_kafka_mock_cluster_t *mcluster,
previous_request_ts =
rd_kafka_mock_request_timestamp(requests[i]);
}
free_mock_requests(requests, request_cnt);
rd_kafka_mock_request_destroy_array(requests, request_cnt);
}
/**
* @brief offset_commit test
Expand Down Expand Up @@ -297,7 +290,7 @@ static void helper_find_coordinator_trigger(rd_kafka_mock_cluster_t *mcluster,
}
}
}
free_mock_requests(requests, request_cnt);
rd_kafka_mock_request_destroy_array(requests, request_cnt);
if (num_request != 1)
TEST_FAIL("No request was made.");
}
Expand Down Expand Up @@ -451,7 +444,7 @@ static void test_produce_fast_leader_query(rd_kafka_mock_cluster_t *mcluster,
}
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(producer);
free_mock_requests(requests, request_cnt);
rd_kafka_mock_request_destroy_array(requests, request_cnt);
rd_kafka_mock_clear_requests(mcluster);
SUB_TEST_PASS();
}
Expand Down Expand Up @@ -511,7 +504,7 @@ static void test_fetch_fast_leader_query(rd_kafka_mock_cluster_t *mcluster,
previous_request_was_Fetch = rd_false;
}
rd_kafka_destroy(consumer);
free_mock_requests(requests, request_cnt);
rd_kafka_mock_request_destroy_array(requests, request_cnt);
rd_kafka_mock_clear_requests(mcluster);
TEST_ASSERT(
Metadata_after_Fetch,
Expand Down
57 changes: 57 additions & 0 deletions tests/0146-metadata_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@

#include "test.h"

#include "../src/rdkafka_proto.h"

static rd_bool_t is_metadata_request(rd_kafka_mock_request_t *request,
void *opaque) {
return rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Metadata;
}

/**
* @brief Metadata should persists in cache after
Expand Down Expand Up @@ -86,6 +92,55 @@ static void do_test_metadata_persists_in_cache(const char *assignor) {

SUB_TEST_PASS();
}

/**
* @brief No loop of metadata requests should be started
* when a metadata request is made without leader epoch change.
* See issue #4577
*/
static void do_test_fast_metadata_refresh_stops(void) {
rd_kafka_t *rk;
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
rd_kafka_conf_t *conf;
int metadata_requests;

SUB_TEST_QUICK();

mcluster = test_mock_cluster_new(3, &bootstraps);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

test_conf_init(&conf, NULL, 10);
test_conf_set(conf, "bootstrap.servers", bootstraps);
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);

rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

/* This error triggers a metadata refresh but no leader change
* happened */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR);

rd_kafka_mock_start_request_tracking(mcluster);
test_produce_msgs2(rk, topic, 0, 0, 0, 1, NULL, 5);

/* First call is for getting initial metadata,
* second one happens after the error,
* it should stop refreshing metadata after that. */
metadata_requests = test_mock_wait_maching_requests(
mcluster, 2, 500, is_metadata_request, NULL);
TEST_ASSERT(metadata_requests == 2,
"Expected 2 metadata request, got %d", metadata_requests);
rd_kafka_mock_stop_request_tracking(mcluster);

rd_kafka_destroy(rk);
test_mock_cluster_destroy(mcluster);

SUB_TEST_PASS();
}

/**
* @brief A metadata call for an existing topic, just after subscription,
* must not cause a UNKNOWN_TOPIC_OR_PART error.
Expand Down Expand Up @@ -134,5 +189,7 @@ int main_0146_metadata_mock(int argc, char **argv) {

do_test_metadata_call_before_join();

do_test_fast_metadata_refresh_stops();

return 0;
}
56 changes: 56 additions & 0 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -7137,7 +7137,63 @@ rd_kafka_mock_cluster_t *test_mock_cluster_new(int broker_cnt,
return mcluster;
}

/**
* @brief Get current number of matching requests,
* received by mock cluster \p mcluster, matching
* function \p match , called with opaque \p opaque .
*/
static size_t test_mock_get_matching_request_cnt(
rd_kafka_mock_cluster_t *mcluster,
rd_bool_t (*match)(rd_kafka_mock_request_t *request, void *opaque),
void *opaque) {
size_t i;
size_t request_cnt;
rd_kafka_mock_request_t **requests;
size_t matching_request_cnt = 0;

requests = rd_kafka_mock_get_requests(mcluster, &request_cnt);

for (i = 0; i < request_cnt; i++) {
if (match(requests[i], opaque))
matching_request_cnt++;
}

rd_kafka_mock_request_destroy_array(requests, request_cnt);
return matching_request_cnt;
}

/**
* @brief Wait that at least \p expected_cnt matching requests
* have been received by the mock cluster,
* using match function \p match ,
* plus \p confidence_interval_ms has passed
*
* @param expected_cnt Number of expected matching request
* @param confidence_interval_ms Time to wait after \p expected_cnt matching
* requests have been seen
* @param match Match function that takes a request and \p opaque
* @param opaque Opaque value needed by function \p match
*
* @return Number of matching requests received.
*/
size_t test_mock_wait_maching_requests(
rd_kafka_mock_cluster_t *mcluster,
size_t expected_cnt,
int confidence_interval_ms,
rd_bool_t (*match)(rd_kafka_mock_request_t *request, void *opaque),
void *opaque) {
size_t matching_request_cnt = 0;

while (matching_request_cnt < expected_cnt) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this criteria is never met, the test will be in infinite loop. Can we make sure that there is some deterministic terminate condition?

matching_request_cnt =
test_mock_get_matching_request_cnt(mcluster, match, opaque);
if (matching_request_cnt < expected_cnt)
rd_usleep(100 * 1000, 0);
}

rd_usleep(confidence_interval_ms * 1000, 0);
return test_mock_get_matching_request_cnt(mcluster, match, opaque);
}

/**
* @name Sub-tests
Expand Down
8 changes: 6 additions & 2 deletions tests/test.h
Original file line number Diff line number Diff line change
Expand Up @@ -859,8 +859,12 @@ rd_kafka_resp_err_t test_delete_all_test_topics(int timeout_ms);
void test_mock_cluster_destroy(rd_kafka_mock_cluster_t *mcluster);
rd_kafka_mock_cluster_t *test_mock_cluster_new(int broker_cnt,
const char **bootstraps);


size_t test_mock_wait_maching_requests(
rd_kafka_mock_cluster_t *mcluster,
size_t num,
int confidence_interval_ms,
rd_bool_t (*match)(rd_kafka_mock_request_t *request, void *opaque),
void *opaque);

int test_error_is_not_fatal_cb(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
Expand Down