From 583ff7dd5d5e21bd9d71537dba09ab628f79d89e Mon Sep 17 00:00:00 2001 From: Nathan Witmer Date: Fri, 15 Jul 2022 15:05:53 -0600 Subject: [PATCH 1/2] Add test case demonstrating admin.DescribeLogDirs hang If DescribeLogDirs is called with an unknown broker ID, it will hang. --- admin_test.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/admin_test.go b/admin_test.go index c21a8c9d9..b27bea046 100644 --- a/admin_test.go +++ b/admin_test.go @@ -4,6 +4,7 @@ import ( "errors" "strings" "testing" + "time" ) func TestClusterAdmin(t *testing.T) { @@ -1742,3 +1743,48 @@ func TestDescribeLogDirs(t *testing.T) { t.Fatal(err) } } + +func TestDescribeLogDirsUnknownBroker(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "DescribeLogDirsRequest": NewMockDescribeLogDirsResponse(t). + SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}), + }) + + config := NewTestConfig() + config.Version = V1_0_0_0 + + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + type result struct { + metadata map[int32][]DescribeLogDirsResponseDirMetadata + err error + } + + res := make(chan result) + + go func() { + metadata, err := admin.DescribeLogDirs([]int32{seedBroker.BrokerID() + 1}) + res <- result{metadata, err} + }() + + select { + case <-time.After(time.Second): + t.Fatalf("DescribeLogDirs timed out") + case returned := <-res: + if len(returned.metadata) != 0 { + t.Fatalf("Expected no results, got %v", len(returned.metadata)) + } + if returned.err != nil { + t.Fatalf("Expected no error, got %v", returned.err) + } + } +} From d2f69a0810d74fb4614cdb5e7ca2586b52b64856 Mon Sep 17 00:00:00 2001 From: Nathan Witmer Date: Fri, 15 Jul 2022 15:14:07 -0600 Subject: [PATCH 2/2] Increment the DescribeLogDirs waitgroup just prior to its goroutine If a broker ID wasn't found, the loop would `continue` after incrementing the waitgroup. Since there is no corresponding `wg.Done()` in that case, the final `wg.Wait()` would hang forever. --- admin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/admin.go b/admin.go index 83279c42b..f06198a65 100644 --- a/admin.go +++ b/admin.go @@ -1050,12 +1050,12 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32 wg := sync.WaitGroup{} for _, b := range brokerIds { - wg.Add(1) broker, err := ca.findBroker(b) if err != nil { Logger.Printf("Unable to find broker with ID = %v\n", b) continue } + wg.Add(1) go func(b *Broker, conf *Config) { defer wg.Done() _ = b.Open(conf) // Ensure that broker is opened