Skip to content

Commit

Permalink
Support metadata request v6 (#1013)
Browse files Browse the repository at this point in the history
* Support metadata request v6

* Empty commit to trigger circleci

* Another empty commit

* Fix TestDialer, add OfflineReplicas to expected response.

* Fix tests

I'm assuming it's ok to send AllowAutoTopicCreation: true in metadata v6 request, since server auto.create.topics.enable controls this anyway.

* Set OfflineReplicas to empty array when it doesn't exist in the v1 response. Maybe fixing TestDialer for old kafkas

* Fix returning errors from readTopicMetadataV1/6.
  • Loading branch information
mtkopone authored Oct 26, 2022
1 parent c1240f0 commit eb07fa4
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 50 deletions.
121 changes: 87 additions & 34 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,48 +943,101 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
topics = nil
}
}
metadataVersion, err := c.negotiateVersion(metadata, v1, v6)
if err != nil {
return nil, err
}

err = c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics))
switch metadataVersion {
case v6:
return c.writeRequest(metadata, v6, id, topicMetadataRequestV6{Topics: topics, AllowAutoTopicCreation: true})
default:
return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics))
}
},
func(deadline time.Time, size int) error {
var res metadataResponseV1
partitions, err = c.readPartitionsResponse(metadataVersion, size)
return err
},
)
return
}

if err := c.readResponse(size, &res); err != nil {
return err
}
func (c *Conn) readPartitionsResponse(metadataVersion apiVersion, size int) ([]Partition, error) {
switch metadataVersion {
case v6:
var res metadataResponseV6
if err := c.readResponse(size, &res); err != nil {
return nil, err
}
brokers := readBrokerMetadata(res.Brokers)
return c.readTopicMetadatav6(brokers, res.Topics)
default:
var res metadataResponseV1
if err := c.readResponse(size, &res); err != nil {
return nil, err
}
brokers := readBrokerMetadata(res.Brokers)
return c.readTopicMetadatav1(brokers, res.Topics)
}
}

brokers := make(map[int32]Broker, len(res.Brokers))
for _, b := range res.Brokers {
brokers[b.NodeID] = Broker{
Host: b.Host,
Port: int(b.Port),
ID: int(b.NodeID),
Rack: b.Rack,
}
}
func readBrokerMetadata(brokerMetadata []brokerMetadataV1) map[int32]Broker {
brokers := make(map[int32]Broker, len(brokerMetadata))
for _, b := range brokerMetadata {
brokers[b.NodeID] = Broker{
Host: b.Host,
Port: int(b.Port),
ID: int(b.NodeID),
Rack: b.Rack,
}
}
return brokers
}

for _, t := range res.Topics {
if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
// We only report errors if they happened for the topic of
// the connection, otherwise the topic will simply have no
// partitions in the result set.
return Error(t.TopicErrorCode)
}
for _, p := range t.Partitions {
partitions = append(partitions, Partition{
Topic: t.TopicName,
Leader: brokers[p.Leader],
Replicas: makeBrokers(brokers, p.Replicas...),
Isr: makeBrokers(brokers, p.Isr...),
ID: int(p.PartitionID),
})
}
}
return nil
},
)
func (c *Conn) readTopicMetadatav1(brokers map[int32]Broker, topicMetadata []topicMetadataV1) (partitions []Partition, err error) {
for _, t := range topicMetadata {
if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
// We only report errors if they happened for the topic of
// the connection, otherwise the topic will simply have no
// partitions in the result set.
return nil, Error(t.TopicErrorCode)
}
for _, p := range t.Partitions {
partitions = append(partitions, Partition{
Topic: t.TopicName,
Leader: brokers[p.Leader],
Replicas: makeBrokers(brokers, p.Replicas...),
Isr: makeBrokers(brokers, p.Isr...),
ID: int(p.PartitionID),
OfflineReplicas: []Broker{},
})
}
}
return
}

func (c *Conn) readTopicMetadatav6(brokers map[int32]Broker, topicMetadata []topicMetadataV6) (partitions []Partition, err error) {
for _, t := range topicMetadata {
if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
// We only report errors if they happened for the topic of
// the connection, otherwise the topic will simply have no
// partitions in the result set.
return nil, Error(t.TopicErrorCode)
}
for _, p := range t.Partitions {
partitions = append(partitions, Partition{
Topic: t.TopicName,
Leader: brokers[p.Leader],
Replicas: makeBrokers(brokers, p.Replicas...),
Isr: makeBrokers(brokers, p.Isr...),
ID: int(p.PartitionID),
OfflineReplicas: makeBrokers(brokers, p.OfflineReplicas...),
})
}
}
return
}

Expand Down
33 changes: 18 additions & 15 deletions dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ func testDialerLookupPartitions(t *testing.T, ctx context.Context, d *Dialer) {

want := []Partition{
{
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
ID: 0,
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
OfflineReplicas: []Broker{},
ID: 0,
},
}
if !reflect.DeepEqual(partitions, want) {
Expand Down Expand Up @@ -230,11 +231,12 @@ func TestDialerTLS(t *testing.T) {

want := []Partition{
{
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
ID: 0,
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
OfflineReplicas: []Broker{},
ID: 0,
},
}
if !reflect.DeepEqual(partitions, want) {
Expand Down Expand Up @@ -377,11 +379,12 @@ func TestDialerResolver(t *testing.T) {

want := []Partition{
{
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
ID: 0,
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
OfflineReplicas: []Broker{},
ID: 0,
},
}
if !reflect.DeepEqual(partitions, want) {
Expand Down
3 changes: 3 additions & 0 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Partition struct {
Replicas []Broker
Isr []Broker

// Available only with metadata API level >= 6:
OfflineReplicas []Broker

// An error that may have occurred while attempting to read the partition
// metadata.
//
Expand Down
86 changes: 86 additions & 0 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,89 @@ func (p partitionMetadataV1) writeTo(wb *writeBuffer) {
wb.writeInt32Array(p.Replicas)
wb.writeInt32Array(p.Isr)
}

type topicMetadataRequestV6 struct {
Topics []string
AllowAutoTopicCreation bool
}

func (r topicMetadataRequestV6) size() int32 {
return sizeofStringArray([]string(r.Topics)) + 1
}

func (r topicMetadataRequestV6) writeTo(wb *writeBuffer) {
// communicate nil-ness to the broker by passing -1 as the array length.
// for this particular request, the broker interpets a zero length array
// as a request for no topics whereas a nil array is for all topics.
if r.Topics == nil {
wb.writeArrayLen(-1)
} else {
wb.writeStringArray([]string(r.Topics))
}
wb.writeBool(r.AllowAutoTopicCreation)
}

type metadataResponseV6 struct {
ThrottleTimeMs int32
Brokers []brokerMetadataV1
ClusterId string
ControllerID int32
Topics []topicMetadataV6
}

func (r metadataResponseV6) size() int32 {
n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() })
n2 := sizeofNullableString(&r.ClusterId)
n3 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
return 4 + 4 + n1 + n2 + n3
}

func (r metadataResponseV6) writeTo(wb *writeBuffer) {
wb.writeInt32(r.ThrottleTimeMs)
wb.writeArray(len(r.Brokers), func(i int) { r.Brokers[i].writeTo(wb) })
wb.writeString(r.ClusterId)
wb.writeInt32(r.ControllerID)
wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}

type topicMetadataV6 struct {
TopicErrorCode int16
TopicName string
Internal bool
Partitions []partitionMetadataV6
}

func (t topicMetadataV6) size() int32 {
return 2 + 1 +
sizeofString(t.TopicName) +
sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
}

func (t topicMetadataV6) writeTo(wb *writeBuffer) {
wb.writeInt16(t.TopicErrorCode)
wb.writeString(t.TopicName)
wb.writeBool(t.Internal)
wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}

type partitionMetadataV6 struct {
PartitionErrorCode int16
PartitionID int32
Leader int32
Replicas []int32
Isr []int32
OfflineReplicas []int32
}

func (p partitionMetadataV6) size() int32 {
return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr) + sizeofInt32Array(p.OfflineReplicas)
}

func (p partitionMetadataV6) writeTo(wb *writeBuffer) {
wb.writeInt16(p.PartitionErrorCode)
wb.writeInt32(p.PartitionID)
wb.writeInt32(p.Leader)
wb.writeInt32Array(p.Replicas)
wb.writeInt32Array(p.Isr)
wb.writeInt32Array(p.OfflineReplicas)
}
3 changes: 2 additions & 1 deletion protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ const (
v2 = 2
v3 = 3
v5 = 5
v6 = 6
v7 = 7
v10 = 10

// Unused protocol versions: v4, v6, v8, v9.
// Unused protocol versions: v4, v8, v9.
)

var apiKeyStrings = [...]string{
Expand Down
24 changes: 24 additions & 0 deletions protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,30 @@ func TestProtocol(t *testing.T) {
},
},

topicMetadataRequestV6{
Topics: []string{"A", "B", "C"},
AllowAutoTopicCreation: true,
},

metadataResponseV6{
Brokers: []brokerMetadataV1{
{NodeID: 1, Host: "localhost", Port: 9001},
{NodeID: 2, Host: "localhost", Port: 9002, Rack: "rack2"},
},
ClusterId: "cluster",
ControllerID: 2,
Topics: []topicMetadataV6{
{TopicErrorCode: 0, Internal: true, Partitions: []partitionMetadataV6{{
PartitionErrorCode: 0,
PartitionID: 1,
Leader: 2,
Replicas: []int32{1},
Isr: []int32{1},
OfflineReplicas: []int32{1},
}}},
},
},

listOffsetRequestV1{
ReplicaID: 1,
Topics: []listOffsetRequestTopicV1{
Expand Down

0 comments on commit eb07fa4

Please sign in to comment.