From 85eb99f4e324d100ebd34594a18c32d34de2d87e Mon Sep 17 00:00:00 2001 From: Zijian Date: Wed, 24 Jan 2024 22:23:29 +0000 Subject: [PATCH] Scaffold asyncqueue provider component --- cmd/server/cadence/server.go | 7 + cmd/server/main.go | 1 + common/archiver/provider/provider.go | 2 +- common/asyncworkflow/queue/interface.go | 35 +++++ common/asyncworkflow/queue/interface_mock.go | 88 ++++++++++++ common/asyncworkflow/queue/kafka/config.go | 84 ++++++++++++ common/asyncworkflow/queue/kafka/consumer.go | 70 ++++++++++ .../queue/kafka/consumer_test.go | 120 ++++++++++++++++ common/asyncworkflow/queue/kafka/init.go | 39 ++++++ common/asyncworkflow/queue/kafka/producer.go | 66 +++++++++ .../queue/kafka/producer_test.go | 120 ++++++++++++++++ common/asyncworkflow/queue/provider.go | 79 +++++++++++ .../asyncworkflow/queue/provider/provider.go | 79 +++++++++++ .../queue/provider/provider_test.go | 128 ++++++++++++++++++ common/asyncworkflow/queue/provider_test.go | 88 ++++++++++++ common/config/config.go | 4 +- common/messaging/kafka/client_impl.go | 4 +- common/messaging/kafka/consumer_impl.go | 7 +- common/messaging/kafka/consumer_impl_test.go | 4 +- common/resource/params.go | 50 +++---- common/resource/resource.go | 3 + common/resource/resourceImpl.go | 10 ++ common/resource/resourceTest.go | 11 ++ .../provider => }/syncmap/syncmap.go | 0 .../provider => }/syncmap/syncmap_test.go | 0 25 files changed, 1065 insertions(+), 34 deletions(-) create mode 100644 common/asyncworkflow/queue/interface.go create mode 100644 common/asyncworkflow/queue/interface_mock.go create mode 100644 common/asyncworkflow/queue/kafka/config.go create mode 100644 common/asyncworkflow/queue/kafka/consumer.go create mode 100644 common/asyncworkflow/queue/kafka/consumer_test.go create mode 100644 common/asyncworkflow/queue/kafka/init.go create mode 100644 common/asyncworkflow/queue/kafka/producer.go create mode 100644 common/asyncworkflow/queue/kafka/producer_test.go create mode 100644 common/asyncworkflow/queue/provider.go create mode 100644 common/asyncworkflow/queue/provider/provider.go create mode 100644 common/asyncworkflow/queue/provider/provider_test.go create mode 100644 common/asyncworkflow/queue/provider_test.go rename common/{archiver/provider => }/syncmap/syncmap.go (100%) rename common/{archiver/provider => }/syncmap/syncmap_test.go (100%) diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 21482b10698..dc11d029ef8 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -32,6 +32,8 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/archiver" "github.com/uber/cadence/common/archiver/provider" + "github.com/uber/cadence/common/asyncworkflow/queue" + asyncworkflowprovider "github.com/uber/cadence/common/asyncworkflow/queue/provider" "github.com/uber/cadence/common/blobstore/filestore" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/config" @@ -284,6 +286,11 @@ func (s *server) startService() common.Daemon { params.BlobstoreClient = nil } + params.AsyncWorkflowQueueProvider, err = queue.NewAsyncQueueProvider(s.cfg.AsyncWorkflowQueues, &asyncworkflowprovider.Params{Logger: params.Logger, MetricsClient: params.MetricsClient}) + if err != nil { + log.Fatalf("error creating async queue provider: %v", err) + } + params.Logger.Info("Starting service " + s.name) var daemon common.Daemon diff --git a/cmd/server/main.go b/cmd/server/main.go index f61a0311a5e..6af56db5862 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -27,6 +27,7 @@ import ( "github.com/uber/cadence/common/metrics" _ "github.com/uber/cadence/common/archiver/gcloud" // needed to load the optional gcloud archiver plugin + _ "github.com/uber/cadence/common/asyncworkflow/queue/kafka" // needed to load kafka asyncworkflow queue _ "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra" // needed to load cassandra plugin _ "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql/public" // needed to load the default gocql client _ "github.com/uber/cadence/common/persistence/sql/sqlplugin/mysql" // needed to load mysql plugin diff --git a/common/archiver/provider/provider.go b/common/archiver/provider/provider.go index a1697265501..60576df55e6 100644 --- a/common/archiver/provider/provider.go +++ b/common/archiver/provider/provider.go @@ -26,8 +26,8 @@ import ( "sync" "github.com/uber/cadence/common/archiver" - "github.com/uber/cadence/common/archiver/provider/syncmap" "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/syncmap" ) var ( diff --git a/common/asyncworkflow/queue/interface.go b/common/asyncworkflow/queue/interface.go new file mode 100644 index 00000000000..25c54c159d8 --- /dev/null +++ b/common/asyncworkflow/queue/interface.go @@ -0,0 +1,35 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go -self_package github.com/uber/cadence/common/asyncworkflow/queue + +package queue + +import "github.com/uber/cadence/common/messaging" + +type ( + // Provider is used to get a queue for a given domain + Provider interface { + GetAsyncQueueProducer(domain string) (messaging.Producer, error) + GetAsyncQueueConsumer(domain string) (messaging.Consumer, error) + } +) diff --git a/common/asyncworkflow/queue/interface_mock.go b/common/asyncworkflow/queue/interface_mock.go new file mode 100644 index 00000000000..078e9a970b6 --- /dev/null +++ b/common/asyncworkflow/queue/interface_mock.go @@ -0,0 +1,88 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: interface.go + +// Package queue is a generated GoMock package. +package queue + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + + messaging "github.com/uber/cadence/common/messaging" +) + +// MockProvider is a mock of Provider interface. +type MockProvider struct { + ctrl *gomock.Controller + recorder *MockProviderMockRecorder +} + +// MockProviderMockRecorder is the mock recorder for MockProvider. +type MockProviderMockRecorder struct { + mock *MockProvider +} + +// NewMockProvider creates a new mock instance. +func NewMockProvider(ctrl *gomock.Controller) *MockProvider { + mock := &MockProvider{ctrl: ctrl} + mock.recorder = &MockProviderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockProvider) EXPECT() *MockProviderMockRecorder { + return m.recorder +} + +// GetAsyncQueueConsumer mocks base method. +func (m *MockProvider) GetAsyncQueueConsumer(domain string) (messaging.Consumer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAsyncQueueConsumer", domain) + ret0, _ := ret[0].(messaging.Consumer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAsyncQueueConsumer indicates an expected call of GetAsyncQueueConsumer. +func (mr *MockProviderMockRecorder) GetAsyncQueueConsumer(domain interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAsyncQueueConsumer", reflect.TypeOf((*MockProvider)(nil).GetAsyncQueueConsumer), domain) +} + +// GetAsyncQueueProducer mocks base method. +func (m *MockProvider) GetAsyncQueueProducer(domain string) (messaging.Producer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAsyncQueueProducer", domain) + ret0, _ := ret[0].(messaging.Producer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAsyncQueueProducer indicates an expected call of GetAsyncQueueProducer. +func (mr *MockProviderMockRecorder) GetAsyncQueueProducer(domain interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAsyncQueueProducer", reflect.TypeOf((*MockProvider)(nil).GetAsyncQueueProducer), domain) +} diff --git a/common/asyncworkflow/queue/kafka/config.go b/common/asyncworkflow/queue/kafka/config.go new file mode 100644 index 00000000000..e8934a3d1b2 --- /dev/null +++ b/common/asyncworkflow/queue/kafka/config.go @@ -0,0 +1,84 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package kafka + +import ( + "fmt" + + "github.com/Shopify/sarama" + + "github.com/uber/cadence/common/authorization" + "github.com/uber/cadence/common/config" +) + +type ( + QueueConfig struct { + Connection ConnectionConfig `yaml:"connection"` + Topic string `yaml:"topic"` + } + + ConnectionConfig struct { + Brokers []string `yaml:"brokers"` + TLS config.TLS `yaml:"tls"` + SASL config.SASL `yaml:"sasl"` + } +) + +func newSaramaConfigWithAuth(tls *config.TLS, sasl *config.SASL) (*sarama.Config, error) { + saramaConfig := sarama.NewConfig() + + // TLS support + tlsConfig, err := tls.ToTLSConfig() + if err != nil { + return nil, fmt.Errorf("Error creating Kafka TLS config %w", err) + } + if tlsConfig != nil { + saramaConfig.Net.TLS.Enable = true + saramaConfig.Net.TLS.Config = tlsConfig + } + + // SASL support + if sasl.Enabled { + saramaConfig.Net.SASL.Enable = true + saramaConfig.Net.SASL.Handshake = true + saramaConfig.Net.SASL.User = sasl.User + saramaConfig.Net.SASL.Password = sasl.Password + switch sasl.Algorithm { + case "sha512": + saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &authorization.XDGSCRAMClient{HashGeneratorFcn: authorization.SHA512} + } + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + case "sha256": + saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &authorization.XDGSCRAMClient{HashGeneratorFcn: authorization.SHA256} + } + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + case "plain": + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext + default: + return nil, fmt.Errorf("unknown SASL algorithm %v", sasl.Algorithm) + } + } + return saramaConfig, nil +} diff --git a/common/asyncworkflow/queue/kafka/consumer.go b/common/asyncworkflow/queue/kafka/consumer.go new file mode 100644 index 00000000000..0092a76ad33 --- /dev/null +++ b/common/asyncworkflow/queue/kafka/consumer.go @@ -0,0 +1,70 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package kafka + +import ( + "fmt" + "time" + + "github.com/Shopify/sarama" + + "github.com/uber/cadence/common/asyncworkflow/queue/provider" + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/messaging/kafka" + "github.com/uber/cadence/common/metrics" +) + +// ConsumerConstructor is a function that constructs a queue consumer +func ConsumerConstructor(cfg *config.YamlNode, params *provider.Params) (messaging.Consumer, error) { + return newConsumerConstructor(params.MetricsClient, params.Logger)(cfg) + +} +func newConsumerConstructor(metricsClient metrics.Client, logger log.Logger) func(cfg *config.YamlNode) (messaging.Consumer, error) { + return func(cfg *config.YamlNode) (messaging.Consumer, error) { + var out *QueueConfig + if err := cfg.Decode(&out); err != nil { + return nil, fmt.Errorf("bad config: %w", err) + } + consumerGroup := fmt.Sprintf("%s-consumer", out.Topic) + dlqTopic := fmt.Sprintf("%s-dlq", out.Topic) + saramaConfig, err := newSaramaConfigWithAuth(&out.Connection.TLS, &out.Connection.SASL) + if err != nil { + return nil, err + } + saramaConfig.Consumer.Fetch.Default = 30 * 1024 * 1024 // 30MB. + saramaConfig.Consumer.Return.Errors = true + saramaConfig.Consumer.Offsets.CommitInterval = time.Second + saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest + saramaConfig.Consumer.MaxProcessingTime = 250 * time.Millisecond + + dlqConfig, err := newSaramaConfigWithAuth(&out.Connection.TLS, &out.Connection.SASL) + if err != nil { + return nil, err + } + dlqConfig.Producer.Return.Successes = true + dlqProducer, err := newProducer(dlqTopic, out.Connection.Brokers, saramaConfig, metricsClient, logger) + return kafka.NewKafkaConsumer(dlqProducer, out.Connection.Brokers, out.Topic, consumerGroup, saramaConfig, metricsClient, logger) + } +} diff --git a/common/asyncworkflow/queue/kafka/consumer_test.go b/common/asyncworkflow/queue/kafka/consumer_test.go new file mode 100644 index 00000000000..a40541dc1fd --- /dev/null +++ b/common/asyncworkflow/queue/kafka/consumer_test.go @@ -0,0 +1,120 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package kafka + +import ( + "testing" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + + "github.com/uber/cadence/common/asyncworkflow/queue/provider" + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/metrics" +) + +func TestConsumerConstructor(t *testing.T) { + testCases := []struct { + name string + configYaml string + wantErr bool + }{ + { + name: "Success case", + configYaml: ` +connection: + brokers: + - localhost:9092 + tls: + enabled: false + sasl: + enabled: false + topic: test-topic +`, + wantErr: false, + }, + { + name: "Invalid yaml", + configYaml: ` +connection: +`, + wantErr: true, + }, + { + name: "Invalid auth config", + configYaml: ` +connection: + brokers: + - localhost:9092 + tls: + enabled: false + sasl: + enabled: true + algorithm: test + topic: test-topic +`, + wantErr: true, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + var out *config.YamlNode + if !tt.wantErr { + mockBroker := sarama.NewMockBroker(t, 0) + defer mockBroker.Close() + mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(mockBroker.Addr(), mockBroker.BrokerID()). + SetLeader("test-topic", 0, mockBroker.BrokerID()). + SetController(mockBroker.BrokerID()), + }) + var queueCfg *QueueConfig + err := yaml.Unmarshal([]byte(tt.configYaml), &queueCfg) + require.NoError(t, err) + queueCfg.Connection.Brokers = []string{mockBroker.Addr()} + configYaml, err := yaml.Marshal(queueCfg) + require.NoError(t, err) + err = yaml.Unmarshal([]byte(configYaml), &out) + require.NoError(t, err) + } else { + err := yaml.Unmarshal([]byte(tt.configYaml), &out) + require.NoError(t, err) + } + + _, err := ConsumerConstructor(out, &provider.Params{ + Logger: testlogger.New(t), + MetricsClient: metrics.NewNoopMetricsClient(), + }) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } + +} diff --git a/common/asyncworkflow/queue/kafka/init.go b/common/asyncworkflow/queue/kafka/init.go new file mode 100644 index 00000000000..05d1ee8639b --- /dev/null +++ b/common/asyncworkflow/queue/kafka/init.go @@ -0,0 +1,39 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package kafka + +import ( + "fmt" + + "github.com/uber/cadence/common/asyncworkflow/queue/provider" +) + +func init() { + must := func(err error) { + if err != nil { + panic(fmt.Errorf("failed to register default provider: %w", err)) + } + } + must(provider.RegisterAsyncQueueProducerProvider("kafka", ProducerConstructor)) + must(provider.RegisterAsyncQueueConsumerProvider("kafka", ConsumerConstructor)) +} diff --git a/common/asyncworkflow/queue/kafka/producer.go b/common/asyncworkflow/queue/kafka/producer.go new file mode 100644 index 00000000000..0a09cfb03b7 --- /dev/null +++ b/common/asyncworkflow/queue/kafka/producer.go @@ -0,0 +1,66 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package kafka + +import ( + "fmt" + + "github.com/Shopify/sarama" + + "github.com/uber/cadence/common/asyncworkflow/queue/provider" + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/messaging/kafka" + "github.com/uber/cadence/common/metrics" +) + +// ProducerConstructor is a function that constructs a queue producer +func ProducerConstructor(cfg *config.YamlNode, params *provider.Params) (messaging.Producer, error) { + return newProducerConstructor(params.MetricsClient, params.Logger)(cfg) +} + +// newProducerConstructor returns a new kafka producer constructor for async queue +func newProducerConstructor(metricsClient metrics.Client, logger log.Logger) func(cfg *config.YamlNode) (messaging.Producer, error) { + return func(cfg *config.YamlNode) (messaging.Producer, error) { + var out *QueueConfig + if err := cfg.Decode(&out); err != nil { + return nil, fmt.Errorf("bad config: %w", err) + } + + config, err := newSaramaConfigWithAuth(&out.Connection.TLS, &out.Connection.SASL) + if err != nil { + return nil, err + } + config.Producer.Return.Successes = true + return newProducer(out.Topic, out.Connection.Brokers, config, metricsClient, logger) + } +} + +func newProducer(topic string, brokers []string, saramaConfig *sarama.Config, metricsClient metrics.Client, logger log.Logger) (messaging.Producer, error) { + p, err := sarama.NewSyncProducer(brokers, saramaConfig) + if err != nil { + return nil, err + } + return messaging.NewMetricProducer(kafka.NewKafkaProducer(topic, p, logger), metricsClient), nil +} diff --git a/common/asyncworkflow/queue/kafka/producer_test.go b/common/asyncworkflow/queue/kafka/producer_test.go new file mode 100644 index 00000000000..54568517a33 --- /dev/null +++ b/common/asyncworkflow/queue/kafka/producer_test.go @@ -0,0 +1,120 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package kafka + +import ( + "testing" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + + "github.com/uber/cadence/common/asyncworkflow/queue/provider" + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/metrics" +) + +func TestProducerConstructor(t *testing.T) { + testCases := []struct { + name string + configYaml string + wantErr bool + }{ + { + name: "Success case", + configYaml: ` +connection: + brokers: + - localhost:9092 + tls: + enabled: false + sasl: + enabled: false + topic: test-topic +`, + wantErr: false, + }, + { + name: "Invalid yaml", + configYaml: ` +connection: +`, + wantErr: true, + }, + { + name: "Invalid auth config", + configYaml: ` +connection: + brokers: + - localhost:9092 + tls: + enabled: false + sasl: + enabled: true + algorithm: test + topic: test-topic +`, + wantErr: true, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + var out *config.YamlNode + if !tt.wantErr { + mockBroker := sarama.NewMockBroker(t, 0) + defer mockBroker.Close() + mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(mockBroker.Addr(), mockBroker.BrokerID()). + SetLeader("test-topic", 0, mockBroker.BrokerID()). + SetController(mockBroker.BrokerID()), + }) + var queueCfg *QueueConfig + err := yaml.Unmarshal([]byte(tt.configYaml), &queueCfg) + require.NoError(t, err) + queueCfg.Connection.Brokers = []string{mockBroker.Addr()} + configYaml, err := yaml.Marshal(queueCfg) + require.NoError(t, err) + err = yaml.Unmarshal([]byte(configYaml), &out) + require.NoError(t, err) + } else { + err := yaml.Unmarshal([]byte(tt.configYaml), &out) + require.NoError(t, err) + } + + _, err := ProducerConstructor(out, &provider.Params{ + Logger: testlogger.New(t), + MetricsClient: metrics.NewNoopMetricsClient(), + }) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } + +} diff --git a/common/asyncworkflow/queue/provider.go b/common/asyncworkflow/queue/provider.go new file mode 100644 index 00000000000..4d848a3e002 --- /dev/null +++ b/common/asyncworkflow/queue/provider.go @@ -0,0 +1,79 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package queue + +import ( + "fmt" + + "github.com/uber/cadence/common/asyncworkflow/queue/provider" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/messaging" +) + +type ( + providerImpl struct { + domainCache cache.DomainCache + + producers map[string]messaging.Producer + consumers map[string]messaging.Consumer + } +) + +// NewAsyncQueueProvider returns a new async queue provider +func NewAsyncQueueProvider(cfg map[string]config.AsyncWorkflowQueueProvider, params *provider.Params) (Provider, error) { + p := &providerImpl{ + producers: make(map[string]messaging.Producer), + consumers: make(map[string]messaging.Consumer), + } + for queueName, queueCfg := range cfg { + producerConstructor, ok := provider.GetAsyncQueueProducerProvider(queueCfg.Type) + if !ok { + return nil, fmt.Errorf("queue type %v not registered", queueCfg.Type) + } + producer, err := producerConstructor(queueCfg.Config, params) + if err != nil { + return nil, err + } + p.producers[queueName] = producer + + consumerConstructor, ok := provider.GetAsyncQueueConsumerProvider(queueCfg.Type) + if !ok { + return nil, fmt.Errorf("queue type %v not registered", queueCfg.Type) + } + consumer, err := consumerConstructor(queueCfg.Config, params) + if err != nil { + return nil, err + } + p.consumers[queueName] = consumer + } + return p, nil +} + +func (p *providerImpl) GetAsyncQueueProducer(domain string) (messaging.Producer, error) { + return nil, fmt.Errorf("to be implemented") +} + +func (p *providerImpl) GetAsyncQueueConsumer(domain string) (messaging.Consumer, error) { + return nil, fmt.Errorf("to be implemented") +} diff --git a/common/asyncworkflow/queue/provider/provider.go b/common/asyncworkflow/queue/provider/provider.go new file mode 100644 index 00000000000..86137425abc --- /dev/null +++ b/common/asyncworkflow/queue/provider/provider.go @@ -0,0 +1,79 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package provider + +import ( + "fmt" + + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/syncmap" +) + +type ( + Params struct { + Logger log.Logger + MetricsClient metrics.Client + } + + // ProducerConstructor is a function that constructs a queue producer + ProducerConstructor func(*config.YamlNode, *Params) (messaging.Producer, error) + + // ConsumerConstructor is a function that constructs a queue consumer + ConsumerConstructor func(*config.YamlNode, *Params) (messaging.Consumer, error) +) + +var ( + asyncQueueProducerConstructors = syncmap.New[string, ProducerConstructor]() + asyncQueueConsumerConstructors = syncmap.New[string, ConsumerConstructor]() +) + +// RegisterAsyncQueueProducerProvider registers a queue producer constructor for a given queue type +func RegisterAsyncQueueProducerProvider(queueType string, producerConstructor ProducerConstructor) error { + inserted := asyncQueueProducerConstructors.Put(queueType, producerConstructor) + if !inserted { + return fmt.Errorf("queue type %v already registered", queueType) + } + return nil +} + +// GetAsyncQueueProducerProvider returns a queue producer constructor for a given queue type +func GetAsyncQueueProducerProvider(queueType string) (ProducerConstructor, bool) { + return asyncQueueProducerConstructors.Get(queueType) +} + +// RegisterAsyncQueueConsumerProvider registers a queue consumer constructor for a given queue type +func RegisterAsyncQueueConsumerProvider(queueType string, consumerConstructor ConsumerConstructor) error { + inserted := asyncQueueConsumerConstructors.Put(queueType, consumerConstructor) + if !inserted { + return fmt.Errorf("queue type %v already registered", queueType) + } + return nil +} + +// GetAsyncQueueConsumerProvider returns a queue consumer constructor for a given queue type +func GetAsyncQueueConsumerProvider(queueType string) (ConsumerConstructor, bool) { + return asyncQueueConsumerConstructors.Get(queueType) +} diff --git a/common/asyncworkflow/queue/provider/provider_test.go b/common/asyncworkflow/queue/provider/provider_test.go new file mode 100644 index 00000000000..c32887e87b8 --- /dev/null +++ b/common/asyncworkflow/queue/provider/provider_test.go @@ -0,0 +1,128 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package provider + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/messaging" +) + +func TestAsyncQueueProducerProvider(t *testing.T) { + testCases := []struct { + name string + queueType string + setup func() + wantErr bool + }{ + { + name: "Success case", + queueType: "q1", + wantErr: false, + }, + { + name: "Duplicate type", + queueType: "q2", + setup: func() { + RegisterAsyncQueueProducerProvider("q2", func(*config.YamlNode, *Params) (messaging.Producer, error) { + return nil, nil + }) + }, + wantErr: true, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + _, ok := GetAsyncQueueProducerProvider(tt.queueType) + assert.False(t, ok) + + if tt.setup != nil { + tt.setup() + } + + err := RegisterAsyncQueueProducerProvider(tt.queueType, func(*config.YamlNode, *Params) (messaging.Producer, error) { + return nil, nil + }) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + _, ok = GetAsyncQueueProducerProvider(tt.queueType) + assert.True(t, ok) + }) + } +} + +func TestAsyncQueueConsumerProvider(t *testing.T) { + testCases := []struct { + name string + queueType string + setup func() + wantErr bool + }{ + { + name: "Success case", + queueType: "q1", + wantErr: false, + }, + { + name: "Duplicate type", + queueType: "q2", + setup: func() { + RegisterAsyncQueueConsumerProvider("q2", func(*config.YamlNode, *Params) (messaging.Consumer, error) { + return nil, nil + }) + }, + wantErr: true, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + _, ok := GetAsyncQueueConsumerProvider(tt.queueType) + assert.False(t, ok) + + if tt.setup != nil { + tt.setup() + } + + err := RegisterAsyncQueueConsumerProvider(tt.queueType, func(*config.YamlNode, *Params) (messaging.Consumer, error) { + return nil, nil + }) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + _, ok = GetAsyncQueueConsumerProvider(tt.queueType) + assert.True(t, ok) + }) + } +} diff --git a/common/asyncworkflow/queue/provider_test.go b/common/asyncworkflow/queue/provider_test.go new file mode 100644 index 00000000000..c764b640f05 --- /dev/null +++ b/common/asyncworkflow/queue/provider_test.go @@ -0,0 +1,88 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package queue + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/uber/cadence/common/asyncworkflow/queue/provider" + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/messaging" +) + +// mockProducerConstructor is a mock function for producer constructor +func mockProducerConstructor(cfg *config.YamlNode, params *provider.Params) (messaging.Producer, error) { + // Mock implementation + return nil, nil +} + +// mockConsumerConstructor is a mock function for consumer constructor +func mockConsumerConstructor(cfg *config.YamlNode, params *provider.Params) (messaging.Consumer, error) { + // Mock implementation + return nil, nil +} + +func TestNewAsyncQueueProvider(t *testing.T) { + // Mock the provider registration functions + provider.RegisterAsyncQueueProducerProvider("validType", mockProducerConstructor) + provider.RegisterAsyncQueueConsumerProvider("validType", mockConsumerConstructor) + + tests := []struct { + name string + cfg map[string]config.AsyncWorkflowQueueProvider + expectError bool + errorContains string + }{ + { + name: "Successful Initialization", + cfg: map[string]config.AsyncWorkflowQueueProvider{ + "testQueue": {Type: "validType", Config: &config.YamlNode{}}, + }, + expectError: false, + }, + { + name: "Unregistered Queue Type", + cfg: map[string]config.AsyncWorkflowQueueProvider{ + "testQueue": {Type: "invalidType", Config: &config.YamlNode{}}, + }, + expectError: true, + errorContains: "not registered", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NewAsyncQueueProvider(tt.cfg, &provider.Params{}) + if tt.expectError { + assert.Error(t, err) + if tt.errorContains != "" { + assert.Contains(t, err.Error(), tt.errorContains) + } + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/common/config/config.go b/common/config/config.go index 1033d262ed2..8dd704088d9 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -600,8 +600,8 @@ type ( // AsyncWorkflowQueueProvider contains the config for an async workflow queue. Only one field must be set. AsyncWorkflowQueueProvider struct { - Type string `yaml:"type"` - AsyncWorkflowQueueConfig *YamlNode `yaml:"config"` + Type string `yaml:"type"` + Config *YamlNode `yaml:"config"` } ) diff --git a/common/messaging/kafka/client_impl.go b/common/messaging/kafka/client_impl.go index 892d2129961..8be63febd30 100644 --- a/common/messaging/kafka/client_impl.go +++ b/common/messaging/kafka/client_impl.go @@ -112,7 +112,9 @@ func (c *clientImpl) NewConsumer(app, consumerName string) (messaging.Consumer, return nil, err } - return newKafkaConsumer(dlqProducer, c.config, topics.Topic, consumerName, saramaConfig, c.metricsClient, c.logger) + clusterName := c.config.GetKafkaClusterForTopic(topics.Topic) + brokers := c.config.GetBrokersForKafkaCluster(clusterName) + return NewKafkaConsumer(dlqProducer, brokers, topics.Topic, consumerName, saramaConfig, c.metricsClient, c.logger) } // NewProducer is used to create a Kafka producer diff --git a/common/messaging/kafka/consumer_impl.go b/common/messaging/kafka/consumer_impl.go index 81e4fd0cb43..39df8ddeb79 100644 --- a/common/messaging/kafka/consumer_impl.go +++ b/common/messaging/kafka/consumer_impl.go @@ -29,7 +29,6 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" - "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/messaging" @@ -78,17 +77,15 @@ type ( var _ messaging.Message = (*messageImpl)(nil) var _ messaging.Consumer = (*consumerImpl)(nil) -func newKafkaConsumer( +func NewKafkaConsumer( dlqProducer messaging.Producer, - kafkaConfig *config.KafkaConfig, + brokers []string, topic string, consumerName string, saramaConfig *sarama.Config, metricsClient metrics.Client, logger log.Logger, ) (messaging.Consumer, error) { - clusterName := kafkaConfig.GetKafkaClusterForTopic(topic) - brokers := kafkaConfig.GetBrokersForKafkaCluster(clusterName) consumerGroup, err := sarama.NewConsumerGroup(brokers, consumerName, saramaConfig) if err != nil { return nil, err diff --git a/common/messaging/kafka/consumer_impl_test.go b/common/messaging/kafka/consumer_impl_test.go index 8ae22535881..e4e9e61aaaa 100644 --- a/common/messaging/kafka/consumer_impl_test.go +++ b/common/messaging/kafka/consumer_impl_test.go @@ -69,7 +69,9 @@ func TestNewConsumer(t *testing.T) { metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) logger := testlogger.New(t) kafkaProducer := NewKafkaProducer(topic, mockProducer, logger) - consumer, err := newKafkaConsumer(kafkaProducer, kafkaConfig, topic, consumerName, + clusterName := kafkaConfig.GetKafkaClusterForTopic(topic) + brokers := kafkaConfig.GetBrokersForKafkaCluster(clusterName) + consumer, err := NewKafkaConsumer(kafkaProducer, brokers, topic, consumerName, nil, metricsClient, logger) assert.NoError(t, err, "An error was not expected but got %v", err) assert.NotNil(t, consumer, "Expected consumer but got nil") diff --git a/common/resource/params.go b/common/resource/params.go index 2e5bd6f9b7e..a34f3e54549 100644 --- a/common/resource/params.go +++ b/common/resource/params.go @@ -27,6 +27,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/archiver" "github.com/uber/cadence/common/archiver/provider" + "github.com/uber/cadence/common/asyncworkflow/queue" "github.com/uber/cadence/common/authorization" "github.com/uber/cadence/common/blobstore" "github.com/uber/cadence/common/cluster" @@ -52,29 +53,30 @@ type ( ThrottledLogger log.Logger HostName string - MetricScope tally.Scope - MembershipResolver membership.Resolver - RPCFactory common.RPCFactory - PProfInitializer common.PProfInitializer - PersistenceConfig config.Persistence - ClusterMetadata cluster.Metadata - ReplicatorConfig config.Replicator - MetricsClient metrics.Client - MessagingClient messaging.Client - BlobstoreClient blobstore.Client - ESClient es.GenericClient - ESConfig *config.ElasticSearchConfig - DynamicConfig dynamicconfig.Client - ClusterRedirectionPolicy *config.ClusterRedirectionPolicy - PublicClient workflowserviceclient.Interface - ArchivalMetadata archiver.ArchivalMetadata - ArchiverProvider provider.ArchiverProvider - Authorizer authorization.Authorizer // NOTE: this can be nil. If nil, AccessControlledHandlerImpl will initiate one with config.Authorization - AuthorizationConfig config.Authorization // NOTE: empty(default) struct will get a authorization.NoopAuthorizer - IsolationGroupStore configstore.Client // This can be nil, the default config store will be created if so - IsolationGroupState isolationgroup.State // This can be nil, the default state store will be chosen if so - Partitioner partition.Partitioner - PinotConfig *config.PinotVisibilityConfig - PinotClient pinot.GenericClient + MetricScope tally.Scope + MembershipResolver membership.Resolver + RPCFactory common.RPCFactory + PProfInitializer common.PProfInitializer + PersistenceConfig config.Persistence + ClusterMetadata cluster.Metadata + ReplicatorConfig config.Replicator + MetricsClient metrics.Client + MessagingClient messaging.Client + BlobstoreClient blobstore.Client + ESClient es.GenericClient + ESConfig *config.ElasticSearchConfig + DynamicConfig dynamicconfig.Client + ClusterRedirectionPolicy *config.ClusterRedirectionPolicy + PublicClient workflowserviceclient.Interface + ArchivalMetadata archiver.ArchivalMetadata + ArchiverProvider provider.ArchiverProvider + Authorizer authorization.Authorizer // NOTE: this can be nil. If nil, AccessControlledHandlerImpl will initiate one with config.Authorization + AuthorizationConfig config.Authorization // NOTE: empty(default) struct will get a authorization.NoopAuthorizer + IsolationGroupStore configstore.Client // This can be nil, the default config store will be created if so + IsolationGroupState isolationgroup.State // This can be nil, the default state store will be chosen if so + Partitioner partition.Partitioner + PinotConfig *config.PinotVisibilityConfig + PinotClient pinot.GenericClient + AsyncWorkflowQueueProvider queue.Provider } ) diff --git a/common/resource/resource.go b/common/resource/resource.go index d19d1712361..6c1639622c3 100644 --- a/common/resource/resource.go +++ b/common/resource/resource.go @@ -32,6 +32,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/archiver" "github.com/uber/cadence/common/archiver/provider" + "github.com/uber/cadence/common/asyncworkflow/queue" "github.com/uber/cadence/common/blobstore" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" @@ -113,5 +114,7 @@ type ( GetIsolationGroupState() isolationgroup.State GetPartitioner() partition.Partitioner GetIsolationGroupStore() configstore.Client + + GetAsyncWorkflowQueueProvider() queue.Provider } ) diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index 87b1dba2082..7b494d0b902 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -38,6 +38,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/archiver" "github.com/uber/cadence/common/archiver/provider" + "github.com/uber/cadence/common/asyncworkflow/queue" "github.com/uber/cadence/common/blobstore" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" @@ -131,6 +132,8 @@ type ( isolationGroups isolationgroup.State isolationGroupConfigStore configstore.Client partitioner partition.Partitioner + + asyncWorkflowQueueProvider queue.Provider } ) @@ -336,6 +339,8 @@ func New( isolationGroups: isolationGroupState, isolationGroupConfigStore: isolationGroupStore, // can be nil where persistence is not available partitioner: partitioner, + + asyncWorkflowQueueProvider: params.AsyncWorkflowQueueProvider, } return impl, nil } @@ -610,6 +615,11 @@ func (h *Impl) GetIsolationGroupStore() configstore.Client { return h.isolationGroupConfigStore } +// GetAsyncWorkflowQueueProvider returns the async workflow queue provider +func (h *Impl) GetAsyncWorkflowQueueProvider() queue.Provider { + return h.asyncWorkflowQueueProvider +} + // due to the config store being only available for some // persistence layers, *both* the configStoreClient and IsolationGroupState // will be optionally available diff --git a/common/resource/resourceTest.go b/common/resource/resourceTest.go index 168426a731b..5f69e67949b 100644 --- a/common/resource/resourceTest.go +++ b/common/resource/resourceTest.go @@ -37,6 +37,7 @@ import ( "github.com/uber/cadence/client/matching" "github.com/uber/cadence/common/archiver" "github.com/uber/cadence/common/archiver/provider" + "github.com/uber/cadence/common/asyncworkflow/queue" "github.com/uber/cadence/common/blobstore" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" @@ -103,6 +104,8 @@ type ( HostName string Logger log.Logger taskvalidator taskvalidator.Checker + + AsyncWorkflowQueueProvider *queue.MockProvider } ) @@ -162,6 +165,8 @@ func NewTest( scope := tally.NewTestScope("test", nil) + asyncWorkflowQueueProvider := queue.NewMockProvider(controller) + return &Test{ MetricsScope: scope, @@ -208,6 +213,8 @@ func NewTest( // logger Logger: logger, + + AsyncWorkflowQueueProvider: asyncWorkflowQueueProvider, } } @@ -437,6 +444,10 @@ func (s *Test) GetIsolationGroupStore() configstore.Client { return s.IsolationGroupStore } +func (s *Test) GetAsyncWorkflowQueueProvider() queue.Provider { + return s.AsyncWorkflowQueueProvider +} + // Finish checks whether expectations are met func (s *Test) Finish( t mock.TestingT, diff --git a/common/archiver/provider/syncmap/syncmap.go b/common/syncmap/syncmap.go similarity index 100% rename from common/archiver/provider/syncmap/syncmap.go rename to common/syncmap/syncmap.go diff --git a/common/archiver/provider/syncmap/syncmap_test.go b/common/syncmap/syncmap_test.go similarity index 100% rename from common/archiver/provider/syncmap/syncmap_test.go rename to common/syncmap/syncmap_test.go