Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support metadata request v6 #1013

Merged
merged 7 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 87 additions & 34 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,48 +943,101 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
topics = nil
}
}
metadataVersion, err := c.negotiateVersion(metadata, v1, v6)
if err != nil {
return nil, err
}

err = c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics))
switch metadataVersion {
case v6:
return c.writeRequest(metadata, v6, id, topicMetadataRequestV6{Topics: topics, AllowAutoTopicCreation: true})
default:
return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics))
}
},
func(deadline time.Time, size int) error {
var res metadataResponseV1
partitions, err = c.readPartitionsResponse(metadataVersion, size)
return err
},
)
return
}

if err := c.readResponse(size, &res); err != nil {
return err
}
func (c *Conn) readPartitionsResponse(metadataVersion apiVersion, size int) ([]Partition, error) {
switch metadataVersion {
case v6:
var res metadataResponseV6
if err := c.readResponse(size, &res); err != nil {
return nil, err
}
brokers := readBrokerMetadata(res.Brokers)
return c.readTopicMetadatav6(brokers, res.Topics)
default:
var res metadataResponseV1
if err := c.readResponse(size, &res); err != nil {
return nil, err
}
brokers := readBrokerMetadata(res.Brokers)
return c.readTopicMetadatav1(brokers, res.Topics)
}
}

brokers := make(map[int32]Broker, len(res.Brokers))
for _, b := range res.Brokers {
brokers[b.NodeID] = Broker{
Host: b.Host,
Port: int(b.Port),
ID: int(b.NodeID),
Rack: b.Rack,
}
}
func readBrokerMetadata(brokerMetadata []brokerMetadataV1) map[int32]Broker {
brokers := make(map[int32]Broker, len(brokerMetadata))
for _, b := range brokerMetadata {
brokers[b.NodeID] = Broker{
Host: b.Host,
Port: int(b.Port),
ID: int(b.NodeID),
Rack: b.Rack,
}
}
return brokers
}

for _, t := range res.Topics {
if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
// We only report errors if they happened for the topic of
// the connection, otherwise the topic will simply have no
// partitions in the result set.
return Error(t.TopicErrorCode)
}
for _, p := range t.Partitions {
partitions = append(partitions, Partition{
Topic: t.TopicName,
Leader: brokers[p.Leader],
Replicas: makeBrokers(brokers, p.Replicas...),
Isr: makeBrokers(brokers, p.Isr...),
ID: int(p.PartitionID),
})
}
}
return nil
},
)
func (c *Conn) readTopicMetadatav1(brokers map[int32]Broker, topicMetadata []topicMetadataV1) (partitions []Partition, err error) {
for _, t := range topicMetadata {
if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
// We only report errors if they happened for the topic of
// the connection, otherwise the topic will simply have no
// partitions in the result set.
return nil, Error(t.TopicErrorCode)
}
for _, p := range t.Partitions {
partitions = append(partitions, Partition{
Topic: t.TopicName,
Leader: brokers[p.Leader],
Replicas: makeBrokers(brokers, p.Replicas...),
Isr: makeBrokers(brokers, p.Isr...),
ID: int(p.PartitionID),
OfflineReplicas: []Broker{},
})
}
}
return
}

func (c *Conn) readTopicMetadatav6(brokers map[int32]Broker, topicMetadata []topicMetadataV6) (partitions []Partition, err error) {
for _, t := range topicMetadata {
if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
// We only report errors if they happened for the topic of
// the connection, otherwise the topic will simply have no
// partitions in the result set.
return nil, Error(t.TopicErrorCode)
}
for _, p := range t.Partitions {
partitions = append(partitions, Partition{
Topic: t.TopicName,
Leader: brokers[p.Leader],
Replicas: makeBrokers(brokers, p.Replicas...),
Isr: makeBrokers(brokers, p.Isr...),
ID: int(p.PartitionID),
OfflineReplicas: makeBrokers(brokers, p.OfflineReplicas...),
})
}
}
return
}

Expand Down
33 changes: 18 additions & 15 deletions dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ func testDialerLookupPartitions(t *testing.T, ctx context.Context, d *Dialer) {

want := []Partition{
{
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
ID: 0,
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
OfflineReplicas: []Broker{},
ID: 0,
},
}
if !reflect.DeepEqual(partitions, want) {
Expand Down Expand Up @@ -230,11 +231,12 @@ func TestDialerTLS(t *testing.T) {

want := []Partition{
{
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
ID: 0,
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
OfflineReplicas: []Broker{},
ID: 0,
},
}
if !reflect.DeepEqual(partitions, want) {
Expand Down Expand Up @@ -377,11 +379,12 @@ func TestDialerResolver(t *testing.T) {

want := []Partition{
{
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
ID: 0,
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
OfflineReplicas: []Broker{},
ID: 0,
},
}
if !reflect.DeepEqual(partitions, want) {
Expand Down
3 changes: 3 additions & 0 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Partition struct {
Replicas []Broker
Isr []Broker

// Available only with metadata API level >= 6:
OfflineReplicas []Broker

// An error that may have occurred while attempting to read the partition
// metadata.
//
Expand Down
86 changes: 86 additions & 0 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,89 @@ func (p partitionMetadataV1) writeTo(wb *writeBuffer) {
wb.writeInt32Array(p.Replicas)
wb.writeInt32Array(p.Isr)
}

type topicMetadataRequestV6 struct {
Topics []string
AllowAutoTopicCreation bool
}

func (r topicMetadataRequestV6) size() int32 {
return sizeofStringArray([]string(r.Topics)) + 1
}

func (r topicMetadataRequestV6) writeTo(wb *writeBuffer) {
// communicate nil-ness to the broker by passing -1 as the array length.
// for this particular request, the broker interpets a zero length array
// as a request for no topics whereas a nil array is for all topics.
if r.Topics == nil {
wb.writeArrayLen(-1)
} else {
wb.writeStringArray([]string(r.Topics))
}
wb.writeBool(r.AllowAutoTopicCreation)
}

type metadataResponseV6 struct {
ThrottleTimeMs int32
Brokers []brokerMetadataV1
ClusterId string
ControllerID int32
Topics []topicMetadataV6
}

func (r metadataResponseV6) size() int32 {
n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() })
n2 := sizeofNullableString(&r.ClusterId)
n3 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
return 4 + 4 + n1 + n2 + n3
}

func (r metadataResponseV6) writeTo(wb *writeBuffer) {
wb.writeInt32(r.ThrottleTimeMs)
wb.writeArray(len(r.Brokers), func(i int) { r.Brokers[i].writeTo(wb) })
wb.writeString(r.ClusterId)
wb.writeInt32(r.ControllerID)
wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}

type topicMetadataV6 struct {
TopicErrorCode int16
TopicName string
Internal bool
Partitions []partitionMetadataV6
}

func (t topicMetadataV6) size() int32 {
return 2 + 1 +
sizeofString(t.TopicName) +
sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
}

func (t topicMetadataV6) writeTo(wb *writeBuffer) {
wb.writeInt16(t.TopicErrorCode)
wb.writeString(t.TopicName)
wb.writeBool(t.Internal)
wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}

type partitionMetadataV6 struct {
PartitionErrorCode int16
PartitionID int32
Leader int32
Replicas []int32
Isr []int32
OfflineReplicas []int32
}

func (p partitionMetadataV6) size() int32 {
return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr) + sizeofInt32Array(p.OfflineReplicas)
}

func (p partitionMetadataV6) writeTo(wb *writeBuffer) {
wb.writeInt16(p.PartitionErrorCode)
wb.writeInt32(p.PartitionID)
wb.writeInt32(p.Leader)
wb.writeInt32Array(p.Replicas)
wb.writeInt32Array(p.Isr)
wb.writeInt32Array(p.OfflineReplicas)
}
3 changes: 2 additions & 1 deletion protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ const (
v2 = 2
v3 = 3
v5 = 5
v6 = 6
v7 = 7
v10 = 10

// Unused protocol versions: v4, v6, v8, v9.
// Unused protocol versions: v4, v8, v9.
)

var apiKeyStrings = [...]string{
Expand Down
24 changes: 24 additions & 0 deletions protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,30 @@ func TestProtocol(t *testing.T) {
},
},

topicMetadataRequestV6{
Topics: []string{"A", "B", "C"},
AllowAutoTopicCreation: true,
},

metadataResponseV6{
Brokers: []brokerMetadataV1{
{NodeID: 1, Host: "localhost", Port: 9001},
{NodeID: 2, Host: "localhost", Port: 9002, Rack: "rack2"},
},
ClusterId: "cluster",
ControllerID: 2,
Topics: []topicMetadataV6{
{TopicErrorCode: 0, Internal: true, Partitions: []partitionMetadataV6{{
PartitionErrorCode: 0,
PartitionID: 1,
Leader: 2,
Replicas: []int32{1},
Isr: []int32{1},
OfflineReplicas: []int32{1},
}}},
},
},

listOffsetRequestV1{
ReplicaID: 1,
Topics: []listOffsetRequestTopicV1{
Expand Down