Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata cache by topic id #4676

Merged
merged 4 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ librdkafka v2.3.1 is a maintenance release:
max period of 1 ms (#4671).
* Fixed a bug causing duplicate message consumption from a stale
fetch start offset in some particular cases (#4636)
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)


## Fixes
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,7 @@ rd_kafka_admin_MetadataRequest(rd_kafka_broker_t *rkb,
rd_kafka_replyq_t replyq,
void *opaque) {
return rd_kafka_MetadataRequest_resp_cb(
rkb, topics, reason,
rkb, topics, NULL, reason,
rd_false /* No admin operation requires topic creation. */,
include_cluster_authorized_operations,
include_topic_authorized_operations,
Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -6462,10 +6462,10 @@ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,

rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));

if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_GENERIC)
return;

if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_GENERIC)
if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
return;

/*
Expand Down
132 changes: 94 additions & 38 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,14 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_metadata_internal_t *mdi = NULL;
rd_kafka_metadata_t *md = NULL;
size_t rkb_namelen;
const int log_decode_errors = LOG_ERR;
rd_list_t *missing_topics = NULL;

const rd_list_t *requested_topics = request_topics;
rd_bool_t all_topics = rd_false;
rd_bool_t cgrp_update = rd_false;
const int log_decode_errors = LOG_ERR;
rd_list_t *missing_topics = NULL;
rd_list_t *missing_topic_ids = NULL;

const rd_list_t *requested_topics = request_topics;
const rd_list_t *requested_topic_ids = NULL;
rd_bool_t all_topics = rd_false;
rd_bool_t cgrp_update = rd_false;
rd_bool_t has_reliable_leader_epochs =
rd_kafka_has_reliable_leader_epochs(rkb);
int ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
Expand All @@ -496,8 +498,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_bool_t compute_racks = has_client_rack;

if (request) {
requested_topics = request->rkbuf_u.Metadata.topics;
all_topics = request->rkbuf_u.Metadata.all_topics;
requested_topics = request->rkbuf_u.Metadata.topics;
requested_topic_ids = request->rkbuf_u.Metadata.topic_ids;
all_topics = request->rkbuf_u.Metadata.all_topics;
cgrp_update =
request->rkbuf_u.Metadata.cgrp_update && rk->rk_cgrp;
compute_racks |= request->rkbuf_u.Metadata.force_racks;
Expand All @@ -519,6 +522,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
if (requested_topics)
missing_topics =
rd_list_copy(requested_topics, rd_list_string_copy, NULL);
if (requested_topic_ids)
missing_topic_ids =
rd_list_copy(requested_topic_ids, rd_list_Uuid_copy, NULL);

rd_kafka_broker_lock(rkb);
rkb_namelen = strlen(rkb->rkb_name) + 1;
Expand Down Expand Up @@ -635,6 +641,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,

if (ApiVersion >= 10) {
rd_kafka_buf_read_uuid(rkbuf, &mdi->topics[i].topic_id);
} else {
mdi->topics[i].topic_id = RD_KAFKA_UUID_ZERO;
}

if (ApiVersion >= 1)
Expand Down Expand Up @@ -831,39 +839,42 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i],
&mdi->topics[i]);

// TODO: Should be done for requested_topic_ids as well.
if (requested_topics) {
if (requested_topics)
rd_list_free_cb(missing_topics,
rd_list_remove_cmp(missing_topics,
md->topics[i].topic,
(void *)strcmp));
if (!all_topics) {
/* Only update cache when not asking
* for all topics. */

rd_kafka_wrlock(rk);
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers,
md->broker_cnt);
cache_changes++;
rd_kafka_wrunlock(rk);
}
if (requested_topic_ids)
rd_list_free_cb(
missing_topic_ids,
rd_list_remove_cmp(missing_topic_ids,
&mdi->topics[i].topic_id,
(void *)rd_kafka_Uuid_ptr_cmp));
if (!all_topics) {
/* Only update cache when not asking
* for all topics. */

rd_kafka_wrlock(rk);
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers, md->broker_cnt);
cache_changes++;
rd_kafka_wrunlock(rk);
}
}

// TODO: Should be done for missing_topic_ids as well.
/* Requested topics not seen in metadata? Propogate to topic code. */
if (missing_topics) {
char *topic;
rd_rkb_dbg(rkb, TOPIC, "METADATA",
"%d/%d requested topic(s) seen in metadata",
"%d/%d requested topic(s) seen in metadata"
" (lookup by name)",
rd_list_cnt(requested_topics) -
rd_list_cnt(missing_topics),
rd_list_cnt(requested_topics));
Expand All @@ -890,6 +901,42 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}
}
}
if (missing_topic_ids) {
rd_kafka_Uuid_t *topic_id;
rd_rkb_dbg(rkb, TOPIC, "METADATA",
"%d/%d requested topic(s) seen in metadata"
" (lookup by id)",
rd_list_cnt(requested_topic_ids) -
rd_list_cnt(missing_topic_ids),
rd_list_cnt(requested_topic_ids));
for (i = 0; i < rd_list_cnt(missing_topic_ids); i++) {
rd_kafka_Uuid_t *missing_topic_id =
missing_topic_ids->rl_elems[i];
rd_rkb_dbg(rkb, TOPIC, "METADATA", "wanted %s",
rd_kafka_Uuid_base64str(missing_topic_id));
}
RD_LIST_FOREACH(topic_id, missing_topic_ids, i) {
rd_kafka_topic_t *rkt;

rd_kafka_rdlock(rk);
rkt = rd_kafka_topic_find_by_topic_id(rkb->rkb_rk,
*topic_id);
rd_kafka_rdunlock(rk);
if (rkt) {
/* Received metadata response contained no
* information about topic 'rkt' and thus
* indicates the topic is not available in the
* cluster.
* Mark the topic as non-existent */
rd_kafka_topic_wrlock(rkt);
rd_kafka_topic_set_notexists(
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
rd_kafka_topic_wrunlock(rkt);

rd_kafka_topic_destroy0(rkt);
}
}
}


rd_kafka_wrlock(rkb->rkb_rk);
Expand Down Expand Up @@ -959,12 +1006,12 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_metadata_cache_propagate_changes(rk);
rd_kafka_metadata_cache_expiry_start(rk);
}


// TODO: Should be done for requested_topic_ids as well.
/* Remove cache hints for the originally requested topics. */
if (requested_topics)
rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
if (requested_topic_ids)
rd_kafka_metadata_cache_purge_hints_by_id(rk,
requested_topic_ids);

rd_kafka_wrunlock(rkb->rkb_rk);

Expand All @@ -980,7 +1027,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
* which may contain only a sub-set of the subscribed topics (namely
* the effective subscription of available topics) as to not
* propagate non-included topics as non-existent. */
if (cgrp_update && (requested_topics || all_topics))
if (cgrp_update &&
(requested_topics || requested_topic_ids || all_topics))
rd_kafka_cgrp_metadata_update_check(rkb->rkb_rk->rk_cgrp,
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
rd_true /*do join*/);

Expand All @@ -993,10 +1041,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}

done:

// TODO: Should be done for requested_topic_ids as well.
if (missing_topics)
rd_list_destroy(missing_topics);
if (missing_topic_ids)
rd_list_destroy(missing_topic_ids);

/* This metadata request was triggered by someone wanting
* the metadata information back as a reply, so send that reply now.
Expand All @@ -1011,18 +1059,26 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
err_parse:
err = rkbuf->rkbuf_err;
err:
// TODO: Should be done for requested_topic_ids as well.
if (requested_topics) {
/* Failed requests shall purge cache hints for
* the requested topics. */
rd_kafka_wrlock(rkb->rkb_rk);
rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
rd_kafka_wrunlock(rkb->rkb_rk);
}
if (requested_topic_ids) {
/* Failed requests shall purge cache hints for
* the requested topics. */
rd_kafka_wrlock(rkb->rkb_rk);
rd_kafka_metadata_cache_purge_hints_by_id(rk,
requested_topic_ids);
rd_kafka_wrunlock(rkb->rkb_rk);
}

// TODO: Should be done for requested_topic_ids as well.
if (missing_topics)
rd_list_destroy(missing_topics);
if (missing_topic_ids)
rd_list_destroy(missing_topic_ids);
rd_tmpabuf_destroy(&tbuf);

return err;
Expand Down
12 changes: 11 additions & 1 deletion src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ rd_kafka_metadata_new_topic_with_partition_replicas_mock(int replication_factor,
*/

struct rd_kafka_metadata_cache_entry {
rd_avl_node_t rkmce_avlnode; /* rkmc_avl */
rd_avl_node_t rkmce_avlnode; /* rkmc_avl */
rd_avl_node_t rkmce_avlnode_by_id; /* rkmc_avl_by_id */
TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */
rd_ts_t rkmce_ts_expires; /* Expire time */
rd_ts_t rkmce_ts_insert; /* Insert time */
Expand All @@ -243,6 +244,7 @@ struct rd_kafka_metadata_cache_entry {

struct rd_kafka_metadata_cache {
rd_avl_t rkmc_avl;
rd_avl_t rkmc_avl_by_id;
TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry;
rd_kafka_timer_t rkmc_expiry_tmr;
int rkmc_cnt;
Expand All @@ -269,6 +271,8 @@ struct rd_kafka_metadata_cache {


int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic);
int rd_kafka_metadata_cache_delete_by_topic_id(rd_kafka_t *rk,
const rd_kafka_Uuid_t topic_id);
void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk);
int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts);
void rd_kafka_metadata_cache_topic_update(
Expand All @@ -282,8 +286,14 @@ void rd_kafka_metadata_cache_topic_update(
void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find_by_id(rd_kafka_t *rk,
const rd_kafka_Uuid_t topic_id,
int valid);
void rd_kafka_metadata_cache_purge_hints(rd_kafka_t *rk,
const rd_list_t *topics);
void rd_kafka_metadata_cache_purge_hints_by_id(rd_kafka_t *rk,
const rd_list_t *topic_ids);
int rd_kafka_metadata_cache_hint(rd_kafka_t *rk,
const rd_list_t *topics,
rd_list_t *dst,
Expand Down
Loading