From 100324e594cf8b286c6554a41a7cf6d89a95438a Mon Sep 17 00:00:00 2001 From: ming luo Date: Tue, 22 Dec 2020 13:39:49 -0500 Subject: [PATCH 1/2] support toke and trust cert for pulsar perf client --- perf/perf-consumer.go | 5 ++--- perf/perf-producer.go | 1 + perf/pulsar-perf-go.go | 25 ++++++++++++++++++++++++- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go index 582c06d3eb..7fb8aab8e9 100644 --- a/perf/perf-consumer.go +++ b/perf/perf-consumer.go @@ -65,9 +65,7 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) { b, _ = json.MarshalIndent(consumeArgs, "", " ") log.Info("Consumer config: ", string(b)) - client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: clientArgs.ServiceURL, - }) + client, err := NewClient() if err != nil { log.Fatal(err) @@ -92,6 +90,7 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) { // Print stats of the consume rate tick := time.NewTicker(10 * time.Second) + defer tick.Stop() for { select { diff --git a/perf/perf-producer.go b/perf/perf-producer.go index ba6e197907..3ffa7c0ca3 100644 --- a/perf/perf-producer.go +++ b/perf/perf-producer.go @@ -136,6 +136,7 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) { // Print stats of the publish rate and latencies tick := time.NewTicker(10 * time.Second) + defer tick.Stop() q := quantile.NewTargeted(0.50, 0.95, 0.99, 0.999, 1.0) messagesPublished := 0 diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go index 8fc0e09204..eb108954d0 100644 --- a/perf/pulsar-perf-go.go +++ b/perf/pulsar-perf-go.go @@ -20,11 +20,13 @@ package main import ( "context" "fmt" + "io/ioutil" "net/http" _ "net/http/pprof" "os" "os/signal" "strconv" + "strings" "github.com/spf13/cobra" @@ -40,7 +42,9 @@ var flagDebug bool var PrometheusPort int type ClientArgs struct { - ServiceURL string + ServiceURL string + TokenFile string + TLSTrustCertFile string } var clientArgs ClientArgs @@ -49,6 +53,23 @@ func NewClient() (pulsar.Client, error) { clientOpts := pulsar.ClientOptions{ URL: clientArgs.ServiceURL, } + + if clientArgs.TokenFile != "" { + // read JWT from the file + tokenBytes, err := ioutil.ReadFile(clientArgs.TokenFile) + if err != nil { + log.WithError(err).Errorf("failed to read Pulsar JWT from a file %s", clientArgs.TokenFile) + os.Exit(1) + } + clientOpts.Authentication = pulsar.NewAuthenticationToken(string(tokenBytes)) + } + + if strings.HasPrefix(clientArgs.ServiceURL, "pulsar+ssl://") { + if clientArgs.TLSTrustCertFile == "" { + return nil, fmt.Errorf("fatal error: missing trustStore while pulsar+ssl tls is enabled") + } + clientOpts.TLSTrustCertsFilePath = clientArgs.TLSTrustCertFile + } return pulsar.NewClient(clientOpts) } @@ -78,6 +99,8 @@ func main() { flags.BoolVar(&flagDebug, "debug", false, "enable debug output") flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u", "pulsar://localhost:6650", "The Pulsar service URL") + flags.StringVar(&clientArgs.TokenFile, "token-file", "", "file path to the Pulsar JWT file") + flags.StringVar(&clientArgs.TLSTrustCertFile, "trust-cert-file", "", "file path to the trusted certificate file") rootCmd.AddCommand(newProducerCommand()) rootCmd.AddCommand(newConsumerCommand()) From d7b8b3774d89216bde32a7025f735ffc7ee97e2e Mon Sep 17 00:00:00 2001 From: ming luo Date: Wed, 23 Dec 2020 15:37:41 -0500 Subject: [PATCH 2/2] remove mandatory check for certificate in command line validation --- perf/pulsar-perf-go.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go index eb108954d0..488a74ffa2 100644 --- a/perf/pulsar-perf-go.go +++ b/perf/pulsar-perf-go.go @@ -26,7 +26,6 @@ import ( "os" "os/signal" "strconv" - "strings" "github.com/spf13/cobra" @@ -64,10 +63,7 @@ func NewClient() (pulsar.Client, error) { clientOpts.Authentication = pulsar.NewAuthenticationToken(string(tokenBytes)) } - if strings.HasPrefix(clientArgs.ServiceURL, "pulsar+ssl://") { - if clientArgs.TLSTrustCertFile == "" { - return nil, fmt.Errorf("fatal error: missing trustStore while pulsar+ssl tls is enabled") - } + if clientArgs.TLSTrustCertFile != "" { clientOpts.TLSTrustCertsFilePath = clientArgs.TLSTrustCertFile } return pulsar.NewClient(clientOpts)