Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admin: Add some missing admin methods #1178

Merged
merged 4 commits into from
Feb 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 229 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package sarama

import "errors"
import (
"errors"
"math/rand"
"sync"
)

// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
Expand All @@ -13,6 +17,12 @@ type ClusterAdmin interface {
// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error

// List the topics available in the cluster with the default options.
ListTopics() (map[string]TopicDetail, error)

// Describe some topics in the cluster
DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)

// Delete a topic. It may take several seconds after the DeleteTopic to returns success
// and for all the brokers to become aware that the topics are gone.
// During this time, listTopics may continue to return information about the deleted topic.
Expand Down Expand Up @@ -65,6 +75,18 @@ type ClusterAdmin interface {
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)

// List the consumer groups available in the cluster.
ListConsumerGroups() (map[string]string, error)

// Describe the given consumer group
DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)

// List the consumer group offsets available in the cluster.
ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)

// Get information about the nodes in the cluster
DescribeCluster() (brokers []*Broker, controllerID int32, err error)

// Close shuts down the admin and closes underlying client.
Close() error
}
Expand Down Expand Up @@ -150,6 +172,123 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
return nil
}

func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
controller, err := ca.Controller()
if err != nil {
return nil, err
}

request := &MetadataRequest{
Topics: topics,
AllowAutoTopicCreation: false,
}

if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 4
}

response, err := controller.GetMetadata(request)
if err != nil {
return nil, err
}
return response.Topics, nil
}

func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
controller, err := ca.Controller()
if err != nil {
return nil, int32(0), err
}

request := &MetadataRequest{
Topics: []string{},
}

response, err := controller.GetMetadata(request)
if err != nil {
return nil, int32(0), err
}

return response.Brokers, response.ControllerID, nil
}

func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
brokers := ca.client.Brokers()
if len(brokers) > 0 {
index := rand.Intn(len(brokers))
return brokers[index], nil
}
return nil, errors.New("no available broker")
}

func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
// In order to build TopicDetails we need to first get the list of all
// topics using a MetadataRequest and then get their configs using a
// DescribeConfigsRequest request. To avoid sending many requests to the
// broker, we use a single DescribeConfigsRequest.

// Send the all-topic MetadataRequest
b, err := ca.findAnyBroker()
if err != nil {
return nil, err
}
_ = b.Open(ca.client.Config())

metadataReq := &MetadataRequest{}
metadataResp, err := b.GetMetadata(metadataReq)
if err != nil {
return nil, err
}

topicsDetailsMap := make(map[string]TopicDetail)

var describeConfigsResources []*ConfigResource

for _, topic := range metadataResp.Topics {
topicDetails := TopicDetail{
NumPartitions: int32(len(topic.Partitions)),
}
if len(topic.Partitions) > 0 {
topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
}
topicsDetailsMap[topic.Name] = topicDetails

// we populate the resources we want to describe from the MetadataResponse
topicResource := ConfigResource{
Type: TopicResource,
Name: topic.Name,
}
describeConfigsResources = append(describeConfigsResources, &topicResource)
}

// Send the DescribeConfigsRequest
describeConfigsReq := &DescribeConfigsRequest{
Resources: describeConfigsResources,
}
describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
if err != nil {
return nil, err
}

for _, resource := range describeConfigsResp.Resources {
topicDetails := topicsDetailsMap[resource.Name]
topicDetails.ConfigEntries = make(map[string]*string)

for _, entry := range resource.Configs {
// only include non-default non-sensitive config
// (don't actually think topic config will ever be sensitive)
if entry.Default || entry.Sensitive {
continue
}
topicDetails.ConfigEntries[entry.Name] = &entry.Value
}

topicsDetailsMap[resource.Name] = topicDetails
}

return topicsDetailsMap, nil
}

func (ca *clusterAdmin) DeleteTopic(topic string) error {

if topic == "" {
Expand Down Expand Up @@ -380,3 +519,92 @@ func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]Matchi
}
return mAcls, nil
}

func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
groupsPerBroker := make(map[*Broker][]string)

for _, group := range groups {
controller, err := ca.client.Coordinator(group)
if err != nil {
return nil, err
}
groupsPerBroker[controller] = append(groupsPerBroker[controller], group)

}

for broker, brokerGroups := range groupsPerBroker {
response, err := broker.DescribeGroups(&DescribeGroupsRequest{
Groups: brokerGroups,
})
if err != nil {
return nil, err
}

result = append(result, response.Groups...)
}
return result, nil
}

func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
allGroups = make(map[string]string)

// Query brokers in parallel, since we have to query *all* brokers
brokers := ca.client.Brokers()
groupMaps := make(chan map[string]string, len(brokers))
errors := make(chan error, len(brokers))
wg := sync.WaitGroup{}

for _, b := range brokers {
wg.Add(1)
go func(b *Broker, conf *Config) {
defer wg.Done()
_ = b.Open(conf) // Ensure that broker is opened
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can do

Suggested change
_ = b.Open(conf) // Ensure that broker is opened
b.Open(conf) // Ensure that broker is opened

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can, but then we don't document that there's an error (deliberately) ignored here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact errcheck will fail travis if i do what you suggest.. https://api.travis-ci.org/v3/job/490310704/log.txt i reverted that.


response, err := b.ListGroups(&ListGroupsRequest{})
if err != nil {
errors <- err
return
}

groups := make(map[string]string)
for group, typ := range response.Groups {
groups[group] = typ
}

groupMaps <- groups

}(b, ca.conf)
}

wg.Wait()
close(groupMaps)
close(errors)

for groupMap := range groupMaps {
for group, protocolType := range groupMap {
allGroups[group] = protocolType
}
}

// Intentionally return only the first error for simplicity
err = <-errors
return
}

func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return nil, err
}

request := &OffsetFetchRequest{
ConsumerGroup: group,
partitions: topicPartitions,
}

if ca.conf.Version.IsAtLeast(V0_8_2_2) {
request.Version = 1
}

return coordinator.FetchOffset(request)
}
Loading