Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add describe log dirs request and response #1520

Merged
merged 9 commits into from
Oct 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,18 @@ func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsRespon
return response, nil
}

//DescribeLogDirs sends a request to get the broker's log dir paths and sizes
func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
response := new(DescribeLogDirsResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down
83 changes: 83 additions & 0 deletions describe_log_dirs_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package sarama

// DescribeLogDirsRequest is a describe request to get partitions' log size
type DescribeLogDirsRequest struct {
// Version 0 and 1 are equal
// The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
Version int16

// If this is an empty array, all topics will be queried
DescribeTopics []DescribeLogDirsRequestTopic
}

// DescribeLogDirsRequestTopic is a describe request about the log dir of one or more partitions within a Topic
type DescribeLogDirsRequestTopic struct {
Topic string
PartitionIDs []int32
}

func (r *DescribeLogDirsRequest) encode(pe packetEncoder) error {
length := len(r.DescribeTopics)
if length == 0 {
// In order to query all topics we must send null
length = -1
}

if err := pe.putArrayLength(length); err != nil {
return err
}

for _, d := range r.DescribeTopics {
if err := pe.putString(d.Topic); err != nil {
return err
}

if err := pe.putInt32Array(d.PartitionIDs); err != nil {
return err
}
}

return nil
}

func (r *DescribeLogDirsRequest) decode(pd packetDecoder, version int16) error {
n, err := pd.getArrayLength()
if err != nil {
return err
}
if n == -1 {
n = 0
}

topics := make([]DescribeLogDirsRequestTopic, n)
for i := 0; i < n; i++ {
topics[i] = DescribeLogDirsRequestTopic{}

topic, err := pd.getString()
if err != nil {
return err
}
topics[i].Topic = topic

pIDs, err := pd.getInt32Array()
if err != nil {
return err
}
topics[i].PartitionIDs = pIDs
}
r.DescribeTopics = topics

return nil
}

func (r *DescribeLogDirsRequest) key() int16 {
return 35
}

func (r *DescribeLogDirsRequest) version() int16 {
return r.Version
}

func (r *DescribeLogDirsRequest) requiredVersion() KafkaVersion {
return V1_0_0_0
}
31 changes: 31 additions & 0 deletions describe_log_dirs_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package sarama

import "testing"

var (
emptyDescribeLogDirsRequest = []byte{255, 255, 255, 255} // Empty array (array length -1 sent)
topicDescribeLogDirsRequest = []byte{
0, 0, 0, 1, // DescribeTopics array, Array length 1
0, 6, // Topic name length 6
'r', 'a', 'n', 'd', 'o', 'm', // Topic name
0, 0, 0, 2, // PartitionIDs int32 array, Array length 2
0, 0, 0, 25, // PartitionID 25
0, 0, 0, 26, // PartitionID 26
}
)

func TestDescribeLogDirsRequest(t *testing.T) {
request := &DescribeLogDirsRequest{
Version: 0,
DescribeTopics: []DescribeLogDirsRequestTopic{},
}
testRequest(t, "no topics", request, emptyDescribeLogDirsRequest)

request.DescribeTopics = []DescribeLogDirsRequestTopic{
DescribeLogDirsRequestTopic{
Topic: "random",
PartitionIDs: []int32{25, 26},
},
}
testRequest(t, "no topics", request, topicDescribeLogDirsRequest)
}
219 changes: 219 additions & 0 deletions describe_log_dirs_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package sarama

import "time"

type DescribeLogDirsResponse struct {
ThrottleTime time.Duration

// Version 0 and 1 are equal
// The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
Version int16

LogDirs []DescribeLogDirsResponseDirMetadata
}

func (r *DescribeLogDirsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))

if err := pe.putArrayLength(len(r.LogDirs)); err != nil {
return err
}

for _, dir := range r.LogDirs {
if err := dir.encode(pe); err != nil {
return err
}
}

return nil
}

func (r *DescribeLogDirsResponse) decode(pd packetDecoder, version int16) error {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

// Decode array of DescribeLogDirsResponseDirMetadata
n, err := pd.getArrayLength()
if err != nil {
return err
}

r.LogDirs = make([]DescribeLogDirsResponseDirMetadata, n)
for i := 0; i < n; i++ {
dir := DescribeLogDirsResponseDirMetadata{}
if err := dir.decode(pd, version); err != nil {
return err
}
r.LogDirs[i] = dir
}

return nil
}

func (r *DescribeLogDirsResponse) key() int16 {
return 35
}

func (r *DescribeLogDirsResponse) version() int16 {
return r.Version
}

func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion {
return V1_0_0_0
}

type DescribeLogDirsResponseDirMetadata struct {
ErrorCode KError

// The absolute log directory path
Path string
Topics []DescribeLogDirsResponseTopic
}

func (r *DescribeLogDirsResponseDirMetadata) encode(pe packetEncoder) error {
pe.putInt16(int16(r.ErrorCode))

if err := pe.putString(r.Path); err != nil {
return err
}

for _, topic := range r.Topics {
if err := topic.encode(pe); err != nil {
return err
}
}

return nil
}

func (r *DescribeLogDirsResponseDirMetadata) decode(pd packetDecoder, version int16) error {
errCode, err := pd.getInt16()
if err != nil {
return err
}
r.ErrorCode = KError(errCode)

path, err := pd.getString()
if err != nil {
return err
}
r.Path = path

// Decode array of DescribeLogDirsResponseTopic
n, err := pd.getArrayLength()
if err != nil {
return err
}

r.Topics = make([]DescribeLogDirsResponseTopic, n)
for i := 0; i < n; i++ {
t := DescribeLogDirsResponseTopic{}

if err := t.decode(pd, version); err != nil {
return err
}

r.Topics[i] = t
}

return nil
}

// DescribeLogDirsResponseTopic contains a topic's partitions descriptions
type DescribeLogDirsResponseTopic struct {
Topic string
Partitions []DescribeLogDirsResponsePartition
}

func (r *DescribeLogDirsResponseTopic) encode(pe packetEncoder) error {
if err := pe.putString(r.Topic); err != nil {
return err
}

for _, partition := range r.Partitions {
if err := partition.encode(pe); err != nil {
return err
}
}

return nil
}

func (r *DescribeLogDirsResponseTopic) decode(pd packetDecoder, version int16) error {
t, err := pd.getString()
if err != nil {
return err
}
r.Topic = t

n, err := pd.getArrayLength()
if err != nil {
return err
}
r.Partitions = make([]DescribeLogDirsResponsePartition, n)
for i := 0; i < n; i++ {
p := DescribeLogDirsResponsePartition{}
if err := p.decode(pd, version); err != nil {
return err
}
r.Partitions[i] = p
}

return nil
}

// DescribeLogDirsResponsePartition describes a partition's log directory
type DescribeLogDirsResponsePartition struct {
PartitionID int32

// The size of the log segments of the partition in bytes.
Size int64

// The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or
// current replica's LEO (if it is the future log for the partition)
OffsetLag int64

// True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of
// the replica in the future.
IsTemporary bool
}

func (r *DescribeLogDirsResponsePartition) encode(pe packetEncoder) error {
pe.putInt32(r.PartitionID)
pe.putInt64(r.Size)
pe.putInt64(r.OffsetLag)
pe.putBool(r.IsTemporary)

return nil
}

func (r *DescribeLogDirsResponsePartition) decode(pd packetDecoder, version int16) error {
pID, err := pd.getInt32()
if err != nil {
return err
}
r.PartitionID = pID

size, err := pd.getInt64()
if err != nil {
return err
}
r.Size = size

lag, err := pd.getInt64()
if err != nil {
return err
}
r.OffsetLag = lag

isTemp, err := pd.getBool()
if err != nil {
return err
}
r.IsTemporary = isTemp

return nil
}
Loading