From eb07fa4224a235649958df1019d264ac468a8548 Mon Sep 17 00:00:00 2001 From: Mikko Koponen Date: Thu, 27 Oct 2022 00:37:07 +0300 Subject: [PATCH] Support metadata request v6 (#1013) * Support metadata request v6 * Empty commit to trigger circleci * Another empty commit * Fix TestDialer, add OfflineReplicas to expected response. * Fix tests I'm assuming it's ok to send AllowAutoTopicCreation: true in metadata v6 request, since server auto.create.topics.enable controls this anyway. * Set OfflineReplicas to empty array when it doesn't exist in the v1 response. Maybe fixing TestDialer for old kafkas * Fix returning errors from readTopicMetadataV1/6. --- conn.go | 121 ++++++++++++++++++++++++++++++++++------------- dialer_test.go | 33 +++++++------ kafka.go | 3 ++ metadata.go | 86 +++++++++++++++++++++++++++++++++ protocol.go | 3 +- protocol_test.go | 24 ++++++++++ 6 files changed, 220 insertions(+), 50 deletions(-) diff --git a/conn.go b/conn.go index 35d131018..ce226767f 100644 --- a/conn.go +++ b/conn.go @@ -943,48 +943,101 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err topics = nil } } + metadataVersion, err := c.negotiateVersion(metadata, v1, v6) + if err != nil { + return nil, err + } err = c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics)) + switch metadataVersion { + case v6: + return c.writeRequest(metadata, v6, id, topicMetadataRequestV6{Topics: topics, AllowAutoTopicCreation: true}) + default: + return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics)) + } }, func(deadline time.Time, size int) error { - var res metadataResponseV1 + partitions, err = c.readPartitionsResponse(metadataVersion, size) + return err + }, + ) + return +} - if err := c.readResponse(size, &res); err != nil { - return err - } +func (c *Conn) readPartitionsResponse(metadataVersion apiVersion, size int) ([]Partition, error) { + switch metadataVersion { + case v6: + var res metadataResponseV6 + if err := c.readResponse(size, &res); err != nil { + return nil, err + } + brokers := readBrokerMetadata(res.Brokers) + return c.readTopicMetadatav6(brokers, res.Topics) + default: + var res metadataResponseV1 + if err := c.readResponse(size, &res); err != nil { + return nil, err + } + brokers := readBrokerMetadata(res.Brokers) + return c.readTopicMetadatav1(brokers, res.Topics) + } +} - brokers := make(map[int32]Broker, len(res.Brokers)) - for _, b := range res.Brokers { - brokers[b.NodeID] = Broker{ - Host: b.Host, - Port: int(b.Port), - ID: int(b.NodeID), - Rack: b.Rack, - } - } +func readBrokerMetadata(brokerMetadata []brokerMetadataV1) map[int32]Broker { + brokers := make(map[int32]Broker, len(brokerMetadata)) + for _, b := range brokerMetadata { + brokers[b.NodeID] = Broker{ + Host: b.Host, + Port: int(b.Port), + ID: int(b.NodeID), + Rack: b.Rack, + } + } + return brokers +} - for _, t := range res.Topics { - if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) { - // We only report errors if they happened for the topic of - // the connection, otherwise the topic will simply have no - // partitions in the result set. - return Error(t.TopicErrorCode) - } - for _, p := range t.Partitions { - partitions = append(partitions, Partition{ - Topic: t.TopicName, - Leader: brokers[p.Leader], - Replicas: makeBrokers(brokers, p.Replicas...), - Isr: makeBrokers(brokers, p.Isr...), - ID: int(p.PartitionID), - }) - } - } - return nil - }, - ) +func (c *Conn) readTopicMetadatav1(brokers map[int32]Broker, topicMetadata []topicMetadataV1) (partitions []Partition, err error) { + for _, t := range topicMetadata { + if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) { + // We only report errors if they happened for the topic of + // the connection, otherwise the topic will simply have no + // partitions in the result set. + return nil, Error(t.TopicErrorCode) + } + for _, p := range t.Partitions { + partitions = append(partitions, Partition{ + Topic: t.TopicName, + Leader: brokers[p.Leader], + Replicas: makeBrokers(brokers, p.Replicas...), + Isr: makeBrokers(brokers, p.Isr...), + ID: int(p.PartitionID), + OfflineReplicas: []Broker{}, + }) + } + } + return +} + +func (c *Conn) readTopicMetadatav6(brokers map[int32]Broker, topicMetadata []topicMetadataV6) (partitions []Partition, err error) { + for _, t := range topicMetadata { + if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) { + // We only report errors if they happened for the topic of + // the connection, otherwise the topic will simply have no + // partitions in the result set. + return nil, Error(t.TopicErrorCode) + } + for _, p := range t.Partitions { + partitions = append(partitions, Partition{ + Topic: t.TopicName, + Leader: brokers[p.Leader], + Replicas: makeBrokers(brokers, p.Replicas...), + Isr: makeBrokers(brokers, p.Isr...), + ID: int(p.PartitionID), + OfflineReplicas: makeBrokers(brokers, p.OfflineReplicas...), + }) + } + } return } diff --git a/dialer_test.go b/dialer_test.go index 7bc9e58c7..4c8b813f3 100644 --- a/dialer_test.go +++ b/dialer_test.go @@ -61,11 +61,12 @@ func testDialerLookupPartitions(t *testing.T, ctx context.Context, d *Dialer) { want := []Partition{ { - Topic: topic, - Leader: Broker{Host: "localhost", Port: 9092, ID: 1}, - Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}}, - Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}}, - ID: 0, + Topic: topic, + Leader: Broker{Host: "localhost", Port: 9092, ID: 1}, + Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}}, + Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}}, + OfflineReplicas: []Broker{}, + ID: 0, }, } if !reflect.DeepEqual(partitions, want) { @@ -230,11 +231,12 @@ func TestDialerTLS(t *testing.T) { want := []Partition{ { - Topic: topic, - Leader: Broker{Host: "localhost", Port: 9092, ID: 1}, - Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}}, - Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}}, - ID: 0, + Topic: topic, + Leader: Broker{Host: "localhost", Port: 9092, ID: 1}, + Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}}, + Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}}, + OfflineReplicas: []Broker{}, + ID: 0, }, } if !reflect.DeepEqual(partitions, want) { @@ -377,11 +379,12 @@ func TestDialerResolver(t *testing.T) { want := []Partition{ { - Topic: topic, - Leader: Broker{Host: "localhost", Port: 9092, ID: 1}, - Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}}, - Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}}, - ID: 0, + Topic: topic, + Leader: Broker{Host: "localhost", Port: 9092, ID: 1}, + Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}}, + Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}}, + OfflineReplicas: []Broker{}, + ID: 0, }, } if !reflect.DeepEqual(partitions, want) { diff --git a/kafka.go b/kafka.go index ec139ac91..d2d36e413 100644 --- a/kafka.go +++ b/kafka.go @@ -47,6 +47,9 @@ type Partition struct { Replicas []Broker Isr []Broker + // Available only with metadata API level >= 6: + OfflineReplicas []Broker + // An error that may have occurred while attempting to read the partition // metadata. // diff --git a/metadata.go b/metadata.go index 04ef287c1..6946cb1e4 100644 --- a/metadata.go +++ b/metadata.go @@ -203,3 +203,89 @@ func (p partitionMetadataV1) writeTo(wb *writeBuffer) { wb.writeInt32Array(p.Replicas) wb.writeInt32Array(p.Isr) } + +type topicMetadataRequestV6 struct { + Topics []string + AllowAutoTopicCreation bool +} + +func (r topicMetadataRequestV6) size() int32 { + return sizeofStringArray([]string(r.Topics)) + 1 +} + +func (r topicMetadataRequestV6) writeTo(wb *writeBuffer) { + // communicate nil-ness to the broker by passing -1 as the array length. + // for this particular request, the broker interpets a zero length array + // as a request for no topics whereas a nil array is for all topics. + if r.Topics == nil { + wb.writeArrayLen(-1) + } else { + wb.writeStringArray([]string(r.Topics)) + } + wb.writeBool(r.AllowAutoTopicCreation) +} + +type metadataResponseV6 struct { + ThrottleTimeMs int32 + Brokers []brokerMetadataV1 + ClusterId string + ControllerID int32 + Topics []topicMetadataV6 +} + +func (r metadataResponseV6) size() int32 { + n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() }) + n2 := sizeofNullableString(&r.ClusterId) + n3 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() }) + return 4 + 4 + n1 + n2 + n3 +} + +func (r metadataResponseV6) writeTo(wb *writeBuffer) { + wb.writeInt32(r.ThrottleTimeMs) + wb.writeArray(len(r.Brokers), func(i int) { r.Brokers[i].writeTo(wb) }) + wb.writeString(r.ClusterId) + wb.writeInt32(r.ControllerID) + wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) }) +} + +type topicMetadataV6 struct { + TopicErrorCode int16 + TopicName string + Internal bool + Partitions []partitionMetadataV6 +} + +func (t topicMetadataV6) size() int32 { + return 2 + 1 + + sizeofString(t.TopicName) + + sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() }) +} + +func (t topicMetadataV6) writeTo(wb *writeBuffer) { + wb.writeInt16(t.TopicErrorCode) + wb.writeString(t.TopicName) + wb.writeBool(t.Internal) + wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) }) +} + +type partitionMetadataV6 struct { + PartitionErrorCode int16 + PartitionID int32 + Leader int32 + Replicas []int32 + Isr []int32 + OfflineReplicas []int32 +} + +func (p partitionMetadataV6) size() int32 { + return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr) + sizeofInt32Array(p.OfflineReplicas) +} + +func (p partitionMetadataV6) writeTo(wb *writeBuffer) { + wb.writeInt16(p.PartitionErrorCode) + wb.writeInt32(p.PartitionID) + wb.writeInt32(p.Leader) + wb.writeInt32Array(p.Replicas) + wb.writeInt32Array(p.Isr) + wb.writeInt32Array(p.OfflineReplicas) +} diff --git a/protocol.go b/protocol.go index 829fabf54..37208abf1 100644 --- a/protocol.go +++ b/protocol.go @@ -107,10 +107,11 @@ const ( v2 = 2 v3 = 3 v5 = 5 + v6 = 6 v7 = 7 v10 = 10 - // Unused protocol versions: v4, v6, v8, v9. + // Unused protocol versions: v4, v8, v9. ) var apiKeyStrings = [...]string{ diff --git a/protocol_test.go b/protocol_test.go index d1f0540fe..7a295949a 100644 --- a/protocol_test.go +++ b/protocol_test.go @@ -76,6 +76,30 @@ func TestProtocol(t *testing.T) { }, }, + topicMetadataRequestV6{ + Topics: []string{"A", "B", "C"}, + AllowAutoTopicCreation: true, + }, + + metadataResponseV6{ + Brokers: []brokerMetadataV1{ + {NodeID: 1, Host: "localhost", Port: 9001}, + {NodeID: 2, Host: "localhost", Port: 9002, Rack: "rack2"}, + }, + ClusterId: "cluster", + ControllerID: 2, + Topics: []topicMetadataV6{ + {TopicErrorCode: 0, Internal: true, Partitions: []partitionMetadataV6{{ + PartitionErrorCode: 0, + PartitionID: 1, + Leader: 2, + Replicas: []int32{1}, + Isr: []int32{1}, + OfflineReplicas: []int32{1}, + }}}, + }, + }, + listOffsetRequestV1{ ReplicaID: 1, Topics: []listOffsetRequestTopicV1{