diff --git a/cmd/channel/distributed/dispatcher/main.go b/cmd/channel/distributed/dispatcher/main.go index b5543eba2c..b7fd644e28 100644 --- a/cmd/channel/distributed/dispatcher/main.go +++ b/cmd/channel/distributed/dispatcher/main.go @@ -21,14 +21,16 @@ import ( "strconv" "strings" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/pkg/injection" + "knative.dev/pkg/injection/sharedmain" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - commonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" + distributedcommonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" commonk8s "knative.dev/eventing-kafka/pkg/channel/distributed/common/k8s" "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/sarama" "knative.dev/eventing-kafka/pkg/channel/distributed/common/metrics" @@ -88,7 +90,7 @@ func main() { } // Update The Sarama Config - Username/Password Overrides (Values From Secret Take Precedence Over ConfigMap) - kafkaAuthCfg, err := commonconfig.GetAuthConfigFromKubernetes(ctx, environment.KafkaSecretName, environment.KafkaSecretNamespace) + kafkaAuthCfg, err := distributedcommonconfig.GetAuthConfigFromKubernetes(ctx, environment.KafkaSecretName, environment.KafkaSecretNamespace) if err != nil { logger.Fatal("Failed To Load Auth Config", zap.Error(err)) } @@ -103,13 +105,13 @@ func main() { sarama.EnableSaramaLogging(ekConfig.Kafka.EnableSaramaLogging) // Initialize Tracing (Watches config-tracing ConfigMap, Assumes Context Came From LoggingContext With Embedded K8S Client Key) - err = commonconfig.InitializeTracing(logger.Sugar(), ctx, environment.ServiceName, environment.SystemNamespace) + err = distributedcommonconfig.InitializeTracing(logger.Sugar(), ctx, environment.ServiceName, environment.SystemNamespace) if err != nil { logger.Fatal("Failed To Initialize Tracing - Terminating", zap.Error(err)) } // Initialize Observability (Watches config-observability ConfigMap And Starts Profiling Server) - err = commonconfig.InitializeObservability(ctx, logger.Sugar(), environment.MetricsDomain, environment.MetricsPort, environment.SystemNamespace) + err = distributedcommonconfig.InitializeObservability(ctx, logger.Sugar(), environment.MetricsDomain, environment.MetricsPort, environment.SystemNamespace) if err != nil { logger.Fatal("Failed To Initialize Observability - Terminating", zap.Error(err)) } @@ -143,14 +145,23 @@ func main() { } dispatcher = dispatch.NewDispatcher(dispatcherConfig) + // Create A Watcher On The Configuration Settings ConfigMap & Dynamically Update Configuration + // Since this is designed to be called by the main() function, the default KNative package behavior here + // is a fatal exit if the watch cannot be set up. + cmw := sharedmain.SetupConfigMapWatchOrDie(ctx, logger.Sugar()) + // Watch The Settings ConfigMap For Changes - err = commonconfig.InitializeConfigWatcher(ctx, logger.Sugar(), configMapObserver, environment.SystemNamespace) + err = commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger.Sugar(), configMapObserver, environment.SystemNamespace) if err != nil { logger.Fatal("Failed To Initialize ConfigMap Watcher", zap.Error(err)) } + if err := cmw.Start(ctx.Done()); err != nil { + logger.Fatal("Failed to start configmap watcher", zap.Error(err)) + } + // Watch The Secret For Changes - err = commonconfig.InitializeSecretWatcher(ctx, environment.KafkaSecretNamespace, environment.KafkaSecretName, secretObserver) + err = distributedcommonconfig.InitializeSecretWatcher(ctx, environment.KafkaSecretNamespace, environment.KafkaSecretName, secretObserver) if err != nil { logger.Fatal("Failed To Start Secret Watcher", zap.Error(err)) } diff --git a/cmd/channel/distributed/receiver/main.go b/cmd/channel/distributed/receiver/main.go index 262bf01590..2cc93556cf 100644 --- a/cmd/channel/distributed/receiver/main.go +++ b/cmd/channel/distributed/receiver/main.go @@ -27,7 +27,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" - commonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" + distributedcommonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" commonk8s "knative.dev/eventing-kafka/pkg/channel/distributed/common/k8s" "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/sarama" kafkautil "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/util" @@ -38,9 +38,11 @@ import ( channelhealth "knative.dev/eventing-kafka/pkg/channel/distributed/receiver/health" "knative.dev/eventing-kafka/pkg/channel/distributed/receiver/producer" kafkaclientset "knative.dev/eventing-kafka/pkg/client/clientset/versioned" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" eventingchannel "knative.dev/eventing/pkg/channel" injectionclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/injection" + "knative.dev/pkg/injection/sharedmain" "knative.dev/pkg/kmeta" "knative.dev/pkg/logging" eventingmetrics "knative.dev/pkg/metrics" @@ -87,7 +89,7 @@ func main() { } // Update The Sarama Config - Username/Password Overrides (Values From Secret Take Precedence Over ConfigMap) - kafkaAuthCfg, err := commonconfig.GetAuthConfigFromKubernetes(ctx, environment.KafkaSecretName, environment.KafkaSecretNamespace) + kafkaAuthCfg, err := distributedcommonconfig.GetAuthConfigFromKubernetes(ctx, environment.KafkaSecretName, environment.KafkaSecretNamespace) if err != nil { logger.Fatal("Failed To Load Auth Config", zap.Error(err)) } @@ -102,13 +104,13 @@ func main() { sarama.EnableSaramaLogging(ekConfig.Kafka.EnableSaramaLogging) // Initialize Tracing (Watches config-tracing ConfigMap, Assumes Context Came From LoggingContext With Embedded K8S Client Key) - err = commonconfig.InitializeTracing(logger.Sugar(), ctx, environment.ServiceName, environment.SystemNamespace) + err = distributedcommonconfig.InitializeTracing(logger.Sugar(), ctx, environment.ServiceName, environment.SystemNamespace) if err != nil { logger.Fatal("Could Not Initialize Tracing - Terminating", zap.Error(err)) } // Initialize Observability (Watches config-observability ConfigMap And Starts Profiling Server) - err = commonconfig.InitializeObservability(ctx, logger.Sugar(), environment.MetricsDomain, environment.MetricsPort, environment.SystemNamespace) + err = distributedcommonconfig.InitializeObservability(ctx, logger.Sugar(), environment.MetricsDomain, environment.MetricsPort, environment.SystemNamespace) if err != nil { logger.Fatal("Could Not Initialize Observability - Terminating", zap.Error(err)) } @@ -133,14 +135,23 @@ func main() { // Create A New Stats StatsReporter statsReporter := metrics.NewStatsReporter(logger) + // Create A Watcher On The Configuration Settings ConfigMap & Dynamically Update Configuration + // Since this is designed to be called by the main() function, the default KNative package behavior here + // is a fatal exit if the watch cannot be set up. + cmw := sharedmain.SetupConfigMapWatchOrDie(ctx, logger.Sugar()) + // Watch The Settings ConfigMap For Changes - err = commonconfig.InitializeConfigWatcher(ctx, logger.Sugar(), configMapObserver, environment.SystemNamespace) + err = commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger.Sugar(), configMapObserver, environment.SystemNamespace) if err != nil { logger.Fatal("Failed To Initialize ConfigMap Watcher", zap.Error(err)) } + if err := cmw.Start(ctx.Done()); err != nil { + logger.Fatal("Failed to start configmap watcher", zap.Error(err)) + } + // Watch The Secret For Changes - err = commonconfig.InitializeSecretWatcher(ctx, environment.KafkaSecretNamespace, environment.KafkaSecretName, secretObserver) + err = distributedcommonconfig.InitializeSecretWatcher(ctx, environment.KafkaSecretNamespace, environment.KafkaSecretName, secretObserver) if err != nil { logger.Fatal("Failed To Start Secret Watcher", zap.Error(err)) } diff --git a/pkg/channel/consolidated/reconciler/controller/controller.go b/pkg/channel/consolidated/reconciler/controller/controller.go index 98cc260e0f..309b6f1779 100644 --- a/pkg/channel/consolidated/reconciler/controller/controller.go +++ b/pkg/channel/consolidated/reconciler/controller/controller.go @@ -20,13 +20,9 @@ import ( "context" "github.com/kelseyhightower/envconfig" - "knative.dev/eventing-kafka/pkg/common/constants" - "go.uber.org/zap" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" - v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" @@ -74,13 +70,15 @@ func NewController( roleBindingLister: roleBindingInformer.Lister(), } + logger := logging.FromContext(ctx) + env := &envConfig{} if err := envconfig.Process("", env); err != nil { - logging.FromContext(ctx).Panicf("unable to process Kafka channel's required environment variables: %v", err) + logger.Panicf("unable to process Kafka channel's required environment variables: %v", err) } if env.Image == "" { - logging.FromContext(ctx).Panic("unable to process Kafka channel's required environment variables (missing DISPATCHER_IMAGE)") + logger.Panic("unable to process Kafka channel's required environment variables (missing DISPATCHER_IMAGE)") } r.dispatcherImage = env.Image @@ -88,15 +86,12 @@ func NewController( impl := kafkaChannelReconciler.NewImpl(ctx, r) // Get and Watch the Kakfa config map and dynamically update Kafka configuration. - if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, constants.SettingsConfigMapName, metav1.GetOptions{}); err == nil { - cmw.Watch(constants.SettingsConfigMapName, func(configMap *v1.ConfigMap) { - r.updateKafkaConfig(ctx, configMap) - }) - } else if !apierrors.IsNotFound(err) { - logging.FromContext(ctx).With(zap.Error(err)).Fatalf("Error reading ConfigMap '%s'", constants.SettingsConfigMapName) + err := commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger, r.updateKafkaConfig, system.Namespace()) + if err != nil { + logger.Fatal("Failed To Initialize ConfigMap Watcher", zap.Error(err)) } - logging.FromContext(ctx).Info("Setting up event handlers") + logger.Info("Setting up event handlers") kafkaChannelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) // Set up watches for dispatcher resources we care about, since any changes to these diff --git a/pkg/channel/distributed/common/config/observability_test.go b/pkg/channel/distributed/common/config/observability_test.go index 308accbcab..6d75e25294 100644 --- a/pkg/channel/distributed/common/config/observability_test.go +++ b/pkg/channel/distributed/common/config/observability_test.go @@ -30,7 +30,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" "knative.dev/eventing-kafka/pkg/channel/distributed/common/env" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" injectionclient "knative.dev/pkg/client/injection/kube/client" configmap "knative.dev/pkg/configmap/informer" logtesting "knative.dev/pkg/logging/testing" diff --git a/pkg/channel/distributed/common/config/secretwatcher_test.go b/pkg/channel/distributed/common/config/secretwatcher_test.go index 9794c4e12c..8673e5babc 100644 --- a/pkg/channel/distributed/common/config/secretwatcher_test.go +++ b/pkg/channel/distributed/common/config/secretwatcher_test.go @@ -27,8 +27,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" kafkaconstants "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/constants" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" "knative.dev/eventing-kafka/pkg/common/constants" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" injectionclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/system" ) diff --git a/pkg/channel/distributed/common/config/tracing_test.go b/pkg/channel/distributed/common/config/tracing_test.go index 1ab2c16682..29d8564f18 100644 --- a/pkg/channel/distributed/common/config/tracing_test.go +++ b/pkg/channel/distributed/common/config/tracing_test.go @@ -20,7 +20,7 @@ import ( "context" "testing" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" diff --git a/pkg/channel/distributed/common/k8s/context_test.go b/pkg/channel/distributed/common/k8s/context_test.go index 3208203d6a..ccf41246f3 100644 --- a/pkg/channel/distributed/common/k8s/context_test.go +++ b/pkg/channel/distributed/common/k8s/context_test.go @@ -25,7 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/logging" "knative.dev/pkg/system" diff --git a/pkg/channel/distributed/common/kafka/sarama/sarama.go b/pkg/channel/distributed/common/kafka/sarama/sarama.go index 3bb7e0cdd7..5dfe1e5e61 100644 --- a/pkg/channel/distributed/common/kafka/sarama/sarama.go +++ b/pkg/channel/distributed/common/kafka/sarama/sarama.go @@ -27,8 +27,8 @@ import ( "github.com/ghodss/yaml" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - commonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" "knative.dev/eventing-kafka/pkg/common/client" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" "knative.dev/eventing-kafka/pkg/common/constants" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/system" diff --git a/pkg/channel/distributed/common/kafka/sarama/sarama_test.go b/pkg/channel/distributed/common/kafka/sarama/sarama_test.go index e1a376c955..e3cd5f6a07 100644 --- a/pkg/channel/distributed/common/kafka/sarama/sarama_test.go +++ b/pkg/channel/distributed/common/kafka/sarama/sarama_test.go @@ -28,9 +28,9 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/client-go/kubernetes/fake" - commonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" "knative.dev/eventing-kafka/pkg/common/constants" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" injectionclient "knative.dev/pkg/client/injection/kube/client" ) diff --git a/pkg/channel/distributed/controller/config/config.go b/pkg/channel/distributed/controller/config/config.go index a438b9c0dd..2815a47f56 100644 --- a/pkg/channel/distributed/controller/config/config.go +++ b/pkg/channel/distributed/controller/config/config.go @@ -19,8 +19,8 @@ package config import ( "strings" - "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/constants" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" ) // ConfigurationError is the type of error returned from VerifyConfiguration @@ -33,7 +33,7 @@ func (err ControllerConfigurationError) Error() string { // VerifyConfiguration returns an error if mandatory fields in the EventingKafkaConfig have not been set either // via the external configmap or the internal variables. -func VerifyConfiguration(configuration *config.EventingKafkaConfig) error { +func VerifyConfiguration(configuration *commonconfig.EventingKafkaConfig) error { // Verify & Lowercase The Kafka AdminType lowercaseKafkaAdminType := strings.ToLower(configuration.Kafka.AdminType) diff --git a/pkg/channel/distributed/controller/config/config_test.go b/pkg/channel/distributed/controller/config/config_test.go index 3f35be7e25..d8fc92ecd6 100644 --- a/pkg/channel/distributed/controller/config/config_test.go +++ b/pkg/channel/distributed/controller/config/config_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/resource" - "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" ) // Test Constants @@ -163,7 +163,7 @@ func TestVerifyConfiguration(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - testConfig := &config.EventingKafkaConfig{} + testConfig := &commonconfig.EventingKafkaConfig{} testConfig.Kafka.Topic.DefaultNumPartitions = testCase.kafkaTopicDefaultNumPartitions testConfig.Kafka.Topic.DefaultReplicationFactor = testCase.kafkaTopicDefaultReplicationFactor testConfig.Kafka.Topic.DefaultRetentionMillis = testCase.kafkaTopicDefaultRetentionMillis diff --git a/pkg/channel/distributed/controller/kafkachannel/controller.go b/pkg/channel/distributed/controller/kafkachannel/controller.go index c68f25fb1e..0856014b24 100644 --- a/pkg/channel/distributed/controller/kafkachannel/controller.go +++ b/pkg/channel/distributed/controller/kafkachannel/controller.go @@ -24,7 +24,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" kafkachannelv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - commonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/admin/types" clientconstants "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/constants" "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/sarama" @@ -35,6 +34,7 @@ import ( "knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel" kafkachannelreconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" commonclient "knative.dev/eventing-kafka/pkg/common/client" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" "knative.dev/pkg/client/injection/kube/informers/core/v1/service" @@ -47,7 +47,7 @@ import ( var rec *Reconciler // Create A New KafkaChannel Controller -func NewController(ctx context.Context, _ configmap.Watcher) *controller.Impl { +func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl { // Get A Logger logger := logging.FromContext(ctx).Desugar() @@ -140,7 +140,7 @@ func NewController(ctx context.Context, _ configmap.Watcher) *controller.Impl { } // Watch The Settings ConfigMap For Changes - err = commonconfig.InitializeConfigWatcher(ctx, logger.Sugar(), rec.configMapObserver, environment.SystemNamespace) + err = commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger.Sugar(), rec.configMapObserver, environment.SystemNamespace) if err != nil { logger.Fatal("Failed To Initialize ConfigMap Watcher", zap.Error(err)) } diff --git a/pkg/channel/distributed/controller/kafkachannel/controller_test.go b/pkg/channel/distributed/controller/kafkachannel/controller_test.go index 1fad246427..91f04dd1c7 100644 --- a/pkg/channel/distributed/controller/kafkachannel/controller_test.go +++ b/pkg/channel/distributed/controller/kafkachannel/controller_test.go @@ -29,16 +29,17 @@ import ( "k8s.io/client-go/rest" kafkachannelv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" commonenv "knative.dev/eventing-kafka/pkg/channel/distributed/common/env" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/constants" controllerenv "knative.dev/eventing-kafka/pkg/channel/distributed/controller/env" controllertesting "knative.dev/eventing-kafka/pkg/channel/distributed/controller/testing" fakeKafkaClient "knative.dev/eventing-kafka/pkg/client/injection/client/fake" _ "knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel/fake" // Knative Fake Informer Injection + commontesting "knative.dev/eventing-kafka/pkg/common/testing" "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/fake" // Knative Fake Informer Injection _ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" // Knative Fake Informer Injection "knative.dev/pkg/injection" + "knative.dev/pkg/injection/sharedmain" "knative.dev/pkg/logging" logtesting "knative.dev/pkg/logging/testing" ) @@ -67,11 +68,14 @@ func TestNewController(t *testing.T) { ctx, fakeKafkaClientset := fakeKafkaClient.With(ctx) assert.NotNil(t, fakeKafkaClientset) + // Create A Watcher On The Configuration Settings ConfigMap & Dynamically Update Configuration + cmw := sharedmain.SetupConfigMapWatchOrDie(ctx, logger) + // Perform The Test (Create The KafkaChannel Controller) environment, err := controllerenv.GetEnvironment(logger.Desugar()) assert.Nil(t, err) ctx = context.WithValue(ctx, controllerenv.Key{}, environment) - controller := NewController(ctx, nil) + controller := NewController(ctx, cmw) // Verify The Results assert.NotNil(t, controller) diff --git a/pkg/channel/distributed/controller/kafkachannel/reconciler.go b/pkg/channel/distributed/controller/kafkachannel/reconciler.go index 779e53cf42..8a9929b10c 100644 --- a/pkg/channel/distributed/controller/kafkachannel/reconciler.go +++ b/pkg/channel/distributed/controller/kafkachannel/reconciler.go @@ -30,7 +30,6 @@ import ( corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" kafkav1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/admin" "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/admin/types" kafkasarama "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/sarama" @@ -42,6 +41,7 @@ import ( "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" kafkalisters "knative.dev/eventing-kafka/pkg/client/listers/messaging/v1beta1" "knative.dev/eventing-kafka/pkg/common/client" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" commonconstants "knative.dev/eventing-kafka/pkg/common/constants" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/logging" @@ -55,13 +55,13 @@ type Reconciler struct { adminClientType types.AdminClientType adminClient types.AdminClientInterface environment *env.Environment - config *config.EventingKafkaConfig + config *commonconfig.EventingKafkaConfig saramaConfig *sarama.Config kafkachannelLister kafkalisters.KafkaChannelLister kafkachannelInformer cache.SharedIndexInformer deploymentLister appsv1listers.DeploymentLister serviceLister corev1listers.ServiceLister - configObserver config.LoggingObserver + configObserver commonconfig.LoggingObserver adminMutex *sync.Mutex kafkaSecret string kafkaBrokers string diff --git a/pkg/channel/distributed/controller/kafkachannel/reconciler_test.go b/pkg/channel/distributed/controller/kafkachannel/reconciler_test.go index 12dc88a4c5..7d3fbc849b 100644 --- a/pkg/channel/distributed/controller/kafkachannel/reconciler_test.go +++ b/pkg/channel/distributed/controller/kafkachannel/reconciler_test.go @@ -30,11 +30,11 @@ import ( kafkav1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" kafkaadmintesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/admin/testing" "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/admin/types" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/event" controllertesting "knative.dev/eventing-kafka/pkg/channel/distributed/controller/testing" fakekafkaclient "knative.dev/eventing-kafka/pkg/client/injection/client/fake" kafkachannelreconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" duckv1 "knative.dev/pkg/apis/duck/v1" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" diff --git a/pkg/channel/distributed/controller/kafkasecret/controller_test.go b/pkg/channel/distributed/controller/kafkasecret/controller_test.go index a0536a0458..21005a91a3 100644 --- a/pkg/channel/distributed/controller/kafkasecret/controller_test.go +++ b/pkg/channel/distributed/controller/kafkasecret/controller_test.go @@ -26,12 +26,12 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/client-go/rest" commonenv "knative.dev/eventing-kafka/pkg/channel/distributed/common/env" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" controllerenv "knative.dev/eventing-kafka/pkg/channel/distributed/controller/env" _ "knative.dev/eventing-kafka/pkg/channel/distributed/controller/kafkasecretinformer/fake" // Knative Fake Informer Injection controllertesting "knative.dev/eventing-kafka/pkg/channel/distributed/controller/testing" fakeKafkaClient "knative.dev/eventing-kafka/pkg/client/injection/client/fake" _ "knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel/fake" // Knative Fake Informer Injection + commontesting "knative.dev/eventing-kafka/pkg/common/testing" "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/fake" // Knative Fake Informer Injection _ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" // Knative Fake Informer Injection diff --git a/pkg/channel/distributed/controller/kafkasecret/reconciler.go b/pkg/channel/distributed/controller/kafkasecret/reconciler.go index 4f232e83a0..05f7e3ed42 100644 --- a/pkg/channel/distributed/controller/kafkasecret/reconciler.go +++ b/pkg/channel/distributed/controller/kafkasecret/reconciler.go @@ -25,13 +25,13 @@ import ( "k8s.io/client-go/kubernetes" appsv1listers "k8s.io/client-go/listers/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" - "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/constants" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/env" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/event" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/kafkasecretinjection" "knative.dev/eventing-kafka/pkg/client/clientset/versioned" kafkalisters "knative.dev/eventing-kafka/pkg/client/listers/messaging/v1beta1" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" ) @@ -39,7 +39,7 @@ import ( // Reconciler Implements controller.Reconciler for K8S Secrets Containing Kafka Auth (Labelled) type Reconciler struct { kubeClientset kubernetes.Interface - config *config.EventingKafkaConfig + config *commonconfig.EventingKafkaConfig environment *env.Environment kafkaChannelClient versioned.Interface kafkachannelLister kafkalisters.KafkaChannelLister diff --git a/pkg/channel/distributed/controller/kafkasecret/reconciler_test.go b/pkg/channel/distributed/controller/kafkasecret/reconciler_test.go index 5a0665fe1d..5bba6595cc 100644 --- a/pkg/channel/distributed/controller/kafkasecret/reconciler_test.go +++ b/pkg/channel/distributed/controller/kafkasecret/reconciler_test.go @@ -21,7 +21,7 @@ import ( "testing" clientgotesting "k8s.io/client-go/testing" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" diff --git a/pkg/channel/distributed/controller/kafkasecretinformer/kafkasecretinformer_test.go b/pkg/channel/distributed/controller/kafkasecretinformer/kafkasecretinformer_test.go index 3d3269d8ec..39d445f742 100644 --- a/pkg/channel/distributed/controller/kafkasecretinformer/kafkasecretinformer_test.go +++ b/pkg/channel/distributed/controller/kafkasecretinformer/kafkasecretinformer_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" "github.com/stretchr/testify/assert" "k8s.io/client-go/kubernetes/fake" diff --git a/pkg/channel/distributed/controller/testing/data.go b/pkg/channel/distributed/controller/testing/data.go index 20b7f95b02..d5c8fb70c4 100644 --- a/pkg/channel/distributed/controller/testing/data.go +++ b/pkg/channel/distributed/controller/testing/data.go @@ -29,16 +29,16 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" clientgotesting "k8s.io/client-go/testing" kafkav1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" commonenv "knative.dev/eventing-kafka/pkg/channel/distributed/common/env" "knative.dev/eventing-kafka/pkg/channel/distributed/common/health" kafkaconstants "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/constants" kafkautil "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/util" - "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/constants" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/env" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/event" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/util" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" "knative.dev/eventing/pkg/apis/messaging" "knative.dev/pkg/apis" "knative.dev/pkg/logging" @@ -197,7 +197,7 @@ func WithoutResources(deployment *appsv1.Deployment) { // Set The Required Environment Variables func NewEnvironment() *env.Environment { return &env.Environment{ - SystemNamespace: testing.SystemNamespace, + SystemNamespace: commontesting.SystemNamespace, ServiceAccount: ServiceAccount, MetricsPort: MetricsPort, MetricsDomain: MetricsDomain, @@ -208,13 +208,13 @@ func NewEnvironment() *env.Environment { } // KafkaConfigOption Enables Customization Of An EventingKafkaConfig -type KafkaConfigOption func(kafkaConfig *config.EventingKafkaConfig) +type KafkaConfigOption func(kafkaConfig *commonconfig.EventingKafkaConfig) // Set The Required Config Fields -func NewConfig(options ...KafkaConfigOption) *config.EventingKafkaConfig { - kafkaConfig := &config.EventingKafkaConfig{ - Dispatcher: config.EKDispatcherConfig{ - EKKubernetesConfig: config.EKKubernetesConfig{ +func NewConfig(options ...KafkaConfigOption) *commonconfig.EventingKafkaConfig { + kafkaConfig := &commonconfig.EventingKafkaConfig{ + Dispatcher: commonconfig.EKDispatcherConfig{ + EKKubernetesConfig: commonconfig.EKKubernetesConfig{ Replicas: DispatcherReplicas, CpuLimit: resource.MustParse(DispatcherCpuLimit), CpuRequest: resource.MustParse(DispatcherCpuRequest), @@ -222,8 +222,8 @@ func NewConfig(options ...KafkaConfigOption) *config.EventingKafkaConfig { MemoryRequest: resource.MustParse(DispatcherMemoryRequest), }, }, - Receiver: config.EKReceiverConfig{ - EKKubernetesConfig: config.EKKubernetesConfig{ + Receiver: commonconfig.EKReceiverConfig{ + EKKubernetesConfig: commonconfig.EKKubernetesConfig{ Replicas: ReceiverReplicas, CpuLimit: resource.MustParse(ReceiverCpuLimit), CpuRequest: resource.MustParse(ReceiverCpuRequest), @@ -231,8 +231,8 @@ func NewConfig(options ...KafkaConfigOption) *config.EventingKafkaConfig { MemoryRequest: resource.MustParse(ReceiverMemoryRequest), }, }, - Kafka: config.EKKafkaConfig{ - Topic: config.EKKafkaTopicConfig{ + Kafka: commonconfig.EKKafkaConfig{ + Topic: commonconfig.EKKafkaTopicConfig{ DefaultNumPartitions: DefaultNumPartitions, DefaultReplicationFactor: DefaultReplicationFactor, DefaultRetentionMillis: DefaultRetentionMillis, @@ -250,7 +250,7 @@ func NewConfig(options ...KafkaConfigOption) *config.EventingKafkaConfig { } // Remove The Receiver Resource Requests And Limits -func WithNoReceiverResources(kafkaConfig *config.EventingKafkaConfig) { +func WithNoReceiverResources(kafkaConfig *commonconfig.EventingKafkaConfig) { kafkaConfig.Receiver.EKKubernetesConfig.CpuLimit = resource.Quantity{} kafkaConfig.Receiver.EKKubernetesConfig.CpuRequest = resource.Quantity{} kafkaConfig.Receiver.EKKubernetesConfig.MemoryLimit = resource.Quantity{} @@ -258,7 +258,7 @@ func WithNoReceiverResources(kafkaConfig *config.EventingKafkaConfig) { } // Remove The Dispatcher Resource Requests And Limits -func WithNoDispatcherResources(kafkaConfig *config.EventingKafkaConfig) { +func WithNoDispatcherResources(kafkaConfig *commonconfig.EventingKafkaConfig) { kafkaConfig.Dispatcher.EKKubernetesConfig.CpuLimit = resource.Quantity{} kafkaConfig.Dispatcher.EKKubernetesConfig.CpuRequest = resource.Quantity{} kafkaConfig.Dispatcher.EKKubernetesConfig.MemoryLimit = resource.Quantity{} diff --git a/pkg/channel/distributed/controller/util/channel.go b/pkg/channel/distributed/controller/util/channel.go index ef344f576b..136ad3640b 100644 --- a/pkg/channel/distributed/controller/util/channel.go +++ b/pkg/channel/distributed/controller/util/channel.go @@ -23,8 +23,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" kafkav1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/constants" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" "knative.dev/pkg/network" ) @@ -81,7 +81,7 @@ func ChannelHostName(channelName, channelNamespace string) string { } // Utility Function To Get The NumPartitions - First From Channel Spec And Then From ConfigMap-Provided Settings -func NumPartitions(channel *kafkav1beta1.KafkaChannel, configuration *config.EventingKafkaConfig, logger *zap.Logger) int32 { +func NumPartitions(channel *kafkav1beta1.KafkaChannel, configuration *commonconfig.EventingKafkaConfig, logger *zap.Logger) int32 { value := channel.Spec.NumPartitions if value <= 0 { logger.Debug("Kafka Channel Spec 'NumPartitions' Not Specified - Using Default", zap.Int32("Value", configuration.Kafka.Topic.DefaultNumPartitions)) @@ -91,7 +91,7 @@ func NumPartitions(channel *kafkav1beta1.KafkaChannel, configuration *config.Eve } // Utility Function To Get The ReplicationFactor - First From Channel Spec And Then From ConfigMap-Provided Settings -func ReplicationFactor(channel *kafkav1beta1.KafkaChannel, configuration *config.EventingKafkaConfig, logger *zap.Logger) int16 { +func ReplicationFactor(channel *kafkav1beta1.KafkaChannel, configuration *commonconfig.EventingKafkaConfig, logger *zap.Logger) int16 { value := channel.Spec.ReplicationFactor if value <= 0 { logger.Debug("Kafka Channel Spec 'ReplicationFactor' Not Specified - Using Default", zap.Int16("Value", configuration.Kafka.Topic.DefaultReplicationFactor)) @@ -101,7 +101,7 @@ func ReplicationFactor(channel *kafkav1beta1.KafkaChannel, configuration *config } // Utility Function To Get The RetentionMillis - First From Channel Spec And Then From ConfigMap-Provided Settings -func RetentionMillis(channel *kafkav1beta1.KafkaChannel, configuration *config.EventingKafkaConfig, logger *zap.Logger) int64 { +func RetentionMillis(channel *kafkav1beta1.KafkaChannel, configuration *commonconfig.EventingKafkaConfig, logger *zap.Logger) int64 { // // TODO - The eventing-contrib KafkaChannel CRD does not include RetentionMillis so we're // currently just using the default value specified in Controller Environment Variables. diff --git a/pkg/channel/distributed/controller/util/channel_test.go b/pkg/channel/distributed/controller/util/channel_test.go index 4e4f98776a..31cce7934e 100644 --- a/pkg/channel/distributed/controller/util/channel_test.go +++ b/pkg/channel/distributed/controller/util/channel_test.go @@ -24,8 +24,8 @@ import ( "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kafkav1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" "knative.dev/eventing-kafka/pkg/channel/distributed/controller/constants" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" logtesting "knative.dev/pkg/logging/testing" ) @@ -120,7 +120,7 @@ func TestNumPartitions(t *testing.T) { logger := logtesting.TestLogger(t).Desugar() // Test Data - configuration := &config.EventingKafkaConfig{Kafka: config.EKKafkaConfig{Topic: config.EKKafkaTopicConfig{DefaultNumPartitions: defaultNumPartitions}}} + configuration := &commonconfig.EventingKafkaConfig{Kafka: commonconfig.EKKafkaConfig{Topic: commonconfig.EKKafkaTopicConfig{DefaultNumPartitions: defaultNumPartitions}}} // Test The Default Failover Use Case channel := &kafkav1beta1.KafkaChannel{} @@ -140,7 +140,7 @@ func TestReplicationFactor(t *testing.T) { logger := logtesting.TestLogger(t).Desugar() // Test Data - configuration := &config.EventingKafkaConfig{Kafka: config.EKKafkaConfig{Topic: config.EKKafkaTopicConfig{DefaultReplicationFactor: defaultReplicationFactor}}} + configuration := &commonconfig.EventingKafkaConfig{Kafka: commonconfig.EKKafkaConfig{Topic: commonconfig.EKKafkaTopicConfig{DefaultReplicationFactor: defaultReplicationFactor}}} // Test The Default Failover Use Case channel := &kafkav1beta1.KafkaChannel{} @@ -160,7 +160,7 @@ func TestRetentionMillis(t *testing.T) { logger := logtesting.TestLogger(t).Desugar() // Test Data - configuration := &config.EventingKafkaConfig{Kafka: config.EKKafkaConfig{Topic: config.EKKafkaTopicConfig{DefaultRetentionMillis: defaultRetentionMillis}}} + configuration := &commonconfig.EventingKafkaConfig{Kafka: commonconfig.EKKafkaConfig{Topic: commonconfig.EKKafkaTopicConfig{DefaultRetentionMillis: defaultRetentionMillis}}} // Test The Default Failover Use Case channel := &kafkav1beta1.KafkaChannel{} diff --git a/pkg/channel/distributed/dispatcher/dispatcher/dispatcher.go b/pkg/channel/distributed/dispatcher/dispatcher/dispatcher.go index 3453e1a6a1..b813fabe66 100644 --- a/pkg/channel/distributed/dispatcher/dispatcher/dispatcher.go +++ b/pkg/channel/distributed/dispatcher/dispatcher/dispatcher.go @@ -22,7 +22,8 @@ import ( "strings" "sync" - "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" + distributedcommonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" "github.com/Shopify/sarama" "go.uber.org/zap" @@ -337,7 +338,7 @@ func (d *DispatcherImpl) SecretChanged(ctx context.Context, secret *corev1.Secre // Debug Log The Secret Change d.Logger.Debug("New Secret Received", zap.String("secret.Name", secret.ObjectMeta.Name)) - kafkaAuthCfg := config.GetAuthConfigFromSecret(secret) + kafkaAuthCfg := distributedcommonconfig.GetAuthConfigFromSecret(secret) if kafkaAuthCfg == nil { d.Logger.Warn("No auth config found in secret; ignoring update") return nil @@ -366,7 +367,7 @@ func (d *DispatcherImpl) SecretChanged(ctx context.Context, secret *corev1.Secre } // Shut down the current dispatcher and recreate it with new settings -func (d *DispatcherImpl) reconfigure(newConfig *sarama.Config, ekConfig *config.EventingKafkaConfig) Dispatcher { +func (d *DispatcherImpl) reconfigure(newConfig *sarama.Config, ekConfig *commonconfig.EventingKafkaConfig) Dispatcher { d.Shutdown() d.DispatcherConfig.SaramaConfig = newConfig if ekConfig != nil { diff --git a/pkg/channel/distributed/dispatcher/dispatcher/dispatcher_test.go b/pkg/channel/distributed/dispatcher/dispatcher/dispatcher_test.go index 4fe9dd1917..e0f9c03ea8 100644 --- a/pkg/channel/distributed/dispatcher/dispatcher/dispatcher_test.go +++ b/pkg/channel/distributed/dispatcher/dispatcher/dispatcher_test.go @@ -26,13 +26,13 @@ import ( "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - commonconfigtesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/config/testing" - configtesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/config/testing" consumertesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/consumer/testing" consumerwrapper "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/consumer/wrapper" "knative.dev/eventing-kafka/pkg/channel/distributed/common/metrics" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" commonclient "knative.dev/eventing-kafka/pkg/common/client" + clienttesting "knative.dev/eventing-kafka/pkg/common/client/testing" + configtesting "knative.dev/eventing-kafka/pkg/common/config/testing" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" eventingduck "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/pkg/logging" logtesting "knative.dev/pkg/logging/testing" @@ -122,8 +122,8 @@ func TestUpdateSubscriptions(t *testing.T) { defer consumertesting.RestoreNewConsumerGroupFn() // Test Data - brokers := []string{commonconfigtesting.DefaultKafkaBroker} - config, err := commonclient.NewConfigBuilder().WithDefaults().FromYaml(commonconfigtesting.DefaultSaramaConfigYaml).Build(ctx) + brokers := []string{configtesting.DefaultKafkaBroker} + config, err := commonclient.NewConfigBuilder().WithDefaults().FromYaml(clienttesting.DefaultSaramaConfigYaml).Build(ctx) assert.Nil(t, err) dispatcherConfig := DispatcherConfig{ @@ -290,10 +290,10 @@ func TestConfigChanged(t *testing.T) { commontesting.SetTestEnvironment(t) // Test Data - brokers := []string{commonconfigtesting.DefaultKafkaBroker} + brokers := []string{configtesting.DefaultKafkaBroker} baseSaramaConfig, err := commonclient.NewConfigBuilder(). WithDefaults(). - FromYaml(commonconfigtesting.DefaultSaramaConfigYaml). + FromYaml(clienttesting.DefaultSaramaConfigYaml). WithVersion(&sarama.V2_0_0_0). Build(ctx) assert.Nil(t, err) @@ -389,17 +389,17 @@ func TestSecretChanged(t *testing.T) { commontesting.SetTestEnvironment(t) // Test Data - brokers := []string{commonconfigtesting.DefaultKafkaBroker} + brokers := []string{configtesting.DefaultKafkaBroker} auth := &commonclient.KafkaAuthConfig{ SASL: &commonclient.KafkaSaslConfig{ - User: commonconfigtesting.DefaultSecretUsername, - Password: commonconfigtesting.DefaultSecretPassword, - SaslType: commonconfigtesting.DefaultSecretSaslType, + User: configtesting.DefaultSecretUsername, + Password: configtesting.DefaultSecretPassword, + SaslType: configtesting.DefaultSecretSaslType, }, } baseSaramaConfig, err := commonclient.NewConfigBuilder(). WithDefaults(). - FromYaml(commonconfigtesting.DefaultSaramaConfigYaml). + FromYaml(clienttesting.DefaultSaramaConfigYaml). WithVersion(&sarama.V2_0_0_0). WithAuth(auth). Build(ctx) diff --git a/pkg/channel/distributed/receiver/producer/producer.go b/pkg/channel/distributed/receiver/producer/producer.go index 5663758692..7dca2eb2c1 100644 --- a/pkg/channel/distributed/receiver/producer/producer.go +++ b/pkg/channel/distributed/receiver/producer/producer.go @@ -29,7 +29,7 @@ import ( "go.opencensus.io/trace" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" + distributedcommonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config" "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/producer" kafkasarama "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/sarama" "knative.dev/eventing-kafka/pkg/channel/distributed/common/metrics" @@ -37,6 +37,7 @@ import ( "knative.dev/eventing-kafka/pkg/channel/distributed/receiver/health" "knative.dev/eventing-kafka/pkg/channel/distributed/receiver/util" "knative.dev/eventing-kafka/pkg/common/client" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" commonconstants "knative.dev/eventing-kafka/pkg/common/constants" "knative.dev/eventing-kafka/pkg/common/tracing" eventingChannel "knative.dev/eventing/pkg/channel" @@ -240,7 +241,7 @@ func (p *Producer) SecretChanged(ctx context.Context, secret *corev1.Secret) *Pr // Debug Log The Secret Change p.logger.Debug("New Secret Received", zap.String("secret.Name", secret.ObjectMeta.Name)) - kafkaAuthCfg := config.GetAuthConfigFromSecret(secret) + kafkaAuthCfg := distributedcommonconfig.GetAuthConfigFromSecret(secret) if kafkaAuthCfg == nil { p.logger.Warn("No auth config found in secret; ignoring update") return nil @@ -288,7 +289,7 @@ func (p *Producer) Close() { } // Shut down the current producer and recreate it with new settings -func (p *Producer) reconfigure(newConfig *sarama.Config, ekConfig *config.EventingKafkaConfig) *Producer { +func (p *Producer) reconfigure(newConfig *sarama.Config, ekConfig *commonconfig.EventingKafkaConfig) *Producer { p.Close() if ekConfig != nil { // Currently the only thing that a new producer might care about in the EventingKafkaConfig is the Brokers diff --git a/pkg/channel/distributed/receiver/producer/producer_test.go b/pkg/channel/distributed/receiver/producer/producer_test.go index ab077565a1..b7fb1f03bf 100644 --- a/pkg/channel/distributed/receiver/producer/producer_test.go +++ b/pkg/channel/distributed/receiver/producer/producer_test.go @@ -24,15 +24,15 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" - commonconfigtesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/config/testing" - configtesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/config/testing" producertesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/producer/testing" "knative.dev/eventing-kafka/pkg/channel/distributed/common/metrics" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" "knative.dev/eventing-kafka/pkg/channel/distributed/receiver/constants" channelhealth "knative.dev/eventing-kafka/pkg/channel/distributed/receiver/health" receivertesting "knative.dev/eventing-kafka/pkg/channel/distributed/receiver/testing" commonclient "knative.dev/eventing-kafka/pkg/common/client" + clienttesting "knative.dev/eventing-kafka/pkg/common/client/testing" + configtesting "knative.dev/eventing-kafka/pkg/common/config/testing" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" "knative.dev/pkg/logging" logtesting "knative.dev/pkg/logging/testing" ) @@ -41,7 +41,7 @@ import ( func TestNewProducer(t *testing.T) { // Test Data - brokers := []string{commonconfigtesting.DefaultKafkaBroker} + brokers := []string{configtesting.DefaultKafkaBroker} config := sarama.NewConfig() // Create A Mock Kafka SyncProducer @@ -62,7 +62,7 @@ func TestNewProducer(t *testing.T) { func TestProduceKafkaMessage(t *testing.T) { // Test Data - brokers := []string{commonconfigtesting.DefaultKafkaBroker} + brokers := []string{configtesting.DefaultKafkaBroker} config := sarama.NewConfig() channelReference := receivertesting.CreateChannelReference(receivertesting.ChannelName, receivertesting.ChannelNamespace) bindingMessage := receivertesting.CreateBindingMessage(cloudevents.VersionV1) @@ -111,8 +111,8 @@ func TestConfigChanged(t *testing.T) { commontesting.SetTestEnvironment(t) // Test Data - brokers := []string{commonconfigtesting.DefaultKafkaBroker} - baseSaramaConfig, err := commonclient.NewConfigBuilder().WithDefaults().FromYaml(commonconfigtesting.DefaultSaramaConfigYaml).Build(ctx) + brokers := []string{configtesting.DefaultKafkaBroker} + baseSaramaConfig, err := commonclient.NewConfigBuilder().WithDefaults().FromYaml(clienttesting.DefaultSaramaConfigYaml).Build(ctx) assert.Nil(t, err) // Define The TestCase Struct @@ -213,15 +213,15 @@ func TestSecretChanged(t *testing.T) { commontesting.SetTestEnvironment(t) // Test Data - brokers := []string{commonconfigtesting.DefaultKafkaBroker} + brokers := []string{configtesting.DefaultKafkaBroker} auth := &commonclient.KafkaAuthConfig{ SASL: &commonclient.KafkaSaslConfig{ - User: commonconfigtesting.DefaultSecretUsername, - Password: commonconfigtesting.DefaultSecretPassword, - SaslType: commonconfigtesting.DefaultSecretSaslType, + User: configtesting.DefaultSecretUsername, + Password: configtesting.DefaultSecretPassword, + SaslType: configtesting.DefaultSecretSaslType, }, } - baseSaramaConfig, err := commonclient.NewConfigBuilder().WithDefaults().FromYaml(commonconfigtesting.DefaultSaramaConfigYaml).WithAuth(auth).Build(ctx) + baseSaramaConfig, err := commonclient.NewConfigBuilder().WithDefaults().FromYaml(clienttesting.DefaultSaramaConfigYaml).WithAuth(auth).Build(ctx) assert.Nil(t, err) // Define The TestCase Struct @@ -315,7 +315,7 @@ func TestSecretChanged(t *testing.T) { func TestClose(t *testing.T) { // Test Data - brokers := []string{commonconfigtesting.DefaultKafkaBroker} + brokers := []string{configtesting.DefaultKafkaBroker} config := sarama.NewConfig() // Create A Mock Kafka SyncProducer diff --git a/pkg/common/client/config_test.go b/pkg/common/client/config_test.go index b4009f881f..242c1ed1ca 100644 --- a/pkg/common/client/config_test.go +++ b/pkg/common/client/config_test.go @@ -33,7 +33,7 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" "knative.dev/pkg/logging" logtesting "knative.dev/pkg/logging/testing" ) diff --git a/pkg/common/client/testing/util.go b/pkg/common/client/testing/util.go new file mode 100644 index 0000000000..f0bfaa4469 --- /dev/null +++ b/pkg/common/client/testing/util.go @@ -0,0 +1,49 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +// Constants +const ( + DefaultSaramaConfigYaml = ` +Version: 2.0.0 +Admin: + Timeout: 10000000000 +Net: + KeepAlive: 30000000000 + MaxOpenRequests: 1 + TLS: + Enable: true + SASL: + Enable: true + Mechanism: PLAIN + Version: 1 +Metadata: + RefreshFrequency: 300000000000 +Consumer: + Offsets: + AutoCommit: + Interval: 5000000000 + Retention: 604800000000000 + Return: + Errors: true +Producer: + Idempotent: true + RequiredAcks: -1 + Return: + Successes: true +` +) diff --git a/pkg/channel/distributed/common/config/configwatcher.go b/pkg/common/config/eventingkafkaconfig.go similarity index 59% rename from pkg/channel/distributed/common/config/configwatcher.go rename to pkg/common/config/eventingkafkaconfig.go index c8a64fb273..c77bc93c84 100644 --- a/pkg/channel/distributed/common/config/configwatcher.go +++ b/pkg/common/config/eventingkafkaconfig.go @@ -17,21 +17,9 @@ limitations under the License. package config import ( - "context" - - "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/eventing-kafka/pkg/common/constants" - kubeclient "knative.dev/pkg/client/injection/kube/client" - "knative.dev/pkg/injection/sharedmain" ) -// This function type is for a shim so that we can pass our own logger to the Observer function -type LoggingObserver func(ctx context.Context, configMap *corev1.ConfigMap) - // The EventingKafkaConfig and these EK sub-structs contain our custom configuration settings, // stored in the config-kafka configmap. The sub-structs are explicitly declared so that they // can have their own JSON tags in the overall EventingKafkaConfig @@ -74,31 +62,3 @@ type EventingKafkaConfig struct { Dispatcher EKDispatcherConfig `json:"dispatcher,omitempty"` Kafka EKKafkaConfig `json:"kafka,omitempty"` } - -// -// Initialize The Specified Context With A ConfigMap Watcher -// Much Of This Function Is Taken From The knative.dev sharedmain Package -// -func InitializeConfigWatcher(ctx context.Context, logger *zap.SugaredLogger, handler LoggingObserver, namespace string) error { - - // Create A Watcher On The Configuration Settings ConfigMap & Dynamically Update Configuration - // Since this is designed to be called by the main() function, the default KNative package behavior here - // is a fatal exit if the watch cannot be set up. - watcher := sharedmain.SetupConfigMapWatchOrDie(ctx, logger) - - // Start The ConfigMap Watcher - // Taken from knative.dev/pkg/injection/sharedmain/main.go::WatchObservabilityConfigOrDie - if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(namespace).Get(ctx, constants.SettingsConfigMapName, metav1.GetOptions{}); err == nil { - watcher.Watch(constants.SettingsConfigMapName, func(configmap *corev1.ConfigMap) { handler(ctx, configmap) }) - } else if !apierrors.IsNotFound(err) { - logger.Error("Error reading ConfigMap "+constants.SettingsConfigMapName, zap.Error(err)) - return err - } - - if err := watcher.Start(ctx.Done()); err != nil { - logger.Error("Failed to start configuration watcher", zap.Error(err)) - return err - } - - return nil -} diff --git a/pkg/common/config/kafkaconfigmapwatcher.go b/pkg/common/config/kafkaconfigmapwatcher.go new file mode 100644 index 0000000000..2e322e9c09 --- /dev/null +++ b/pkg/common/config/kafkaconfigmapwatcher.go @@ -0,0 +1,49 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "context" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/eventing-kafka/pkg/common/constants" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/configmap" +) + +// This function type is for a shim so that we can pass our own logger to the Observer function +type LoggingObserver func(ctx context.Context, configMap *corev1.ConfigMap) + +// +// Initialize The Specified Context With A ConfigMap Watcher +// Much Of This Function Is Taken From The knative.dev sharedmain Package +// +func InitializeKafkaConfigMapWatcher(ctx context.Context, watcher configmap.Watcher, logger *zap.SugaredLogger, handler LoggingObserver, namespace string) error { + // Start The ConfigMap Watcher + // Taken from knative.dev/pkg/injection/sharedmain/main.go::WatchObservabilityConfigOrDie + if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(namespace).Get(ctx, constants.SettingsConfigMapName, metav1.GetOptions{}); err == nil { + watcher.Watch(constants.SettingsConfigMapName, func(configmap *corev1.ConfigMap) { handler(ctx, configmap) }) + } else if !apierrors.IsNotFound(err) { + logger.Error("Error reading ConfigMap "+constants.SettingsConfigMapName, zap.Error(err)) + return err + } + + return nil +} diff --git a/pkg/channel/distributed/common/config/configwatcher_test.go b/pkg/common/config/kafkaconfigmapwatcher_test.go similarity index 84% rename from pkg/channel/distributed/common/config/configwatcher_test.go rename to pkg/common/config/kafkaconfigmapwatcher_test.go index fda8d2d86f..7d0c15face 100644 --- a/pkg/channel/distributed/common/config/configwatcher_test.go +++ b/pkg/common/config/kafkaconfigmapwatcher_test.go @@ -23,12 +23,14 @@ import ( "time" "github.com/stretchr/testify/assert" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" - commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing" "knative.dev/eventing-kafka/pkg/common/constants" + commontesting "knative.dev/eventing-kafka/pkg/common/testing" injectionclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/injection/sharedmain" logtesting "knative.dev/pkg/logging/testing" "knative.dev/pkg/system" ) @@ -39,22 +41,22 @@ var ( configMapMutex = sync.Mutex{} // Don't trip up the data race examiner during tests ) -// Test The InitializeConfigWatcher() Functionality -func TestInitializeConfigWatcher(t *testing.T) { +// Test The InitializeKafkaConfigMapWatcher() Functionality +func TestInitializeKafkaConfigMapWatcher(t *testing.T) { - // Obtain a Test Logger (Required By be InitializeConfigWatcher function) + // Obtain a Test Logger (Required By be InitializeKafkaConfigMapWatcher function) logger := logtesting.TestLogger(t) // Setup Environment commontesting.SetTestEnvironment(t) - // Create A Test Sarama ConfigMap For The InitializeConfigWatcher() Call To Watch + // Create A Test Sarama ConfigMap For The InitializeKafkaConfigMapWatcher() Call To Watch configMap := commontesting.GetTestSaramaConfigMap(commontesting.OldSaramaConfig, commontesting.TestEKConfig) // Create The Fake K8S Client And Add It To The ConfigMap fakeK8sClient := fake.NewSimpleClientset(configMap) - // Add The Fake K8S Client To The Context (Required By InitializeConfigWatcher) + // Add The Fake K8S Client To The Context (Required By InitializeKafkaConfigMapWatcher) ctx := context.WithValue(context.TODO(), injectionclient.Key{}, fakeK8sClient) // The configWatcherHandler should change the nil "watchedConfigMap" to a valid ConfigMap when the watcher triggers @@ -63,10 +65,16 @@ func TestInitializeConfigWatcher(t *testing.T) { assert.Nil(t, err) assert.Equal(t, testConfigMap.Data["sarama"], commontesting.OldSaramaConfig) + cmw := sharedmain.SetupConfigMapWatchOrDie(ctx, logger) + // Perform The Test (Initialize The Config Watcher) - err = InitializeConfigWatcher(ctx, logger, configWatcherHandler, system.Namespace()) + err = InitializeKafkaConfigMapWatcher(ctx, cmw, logger, configWatcherHandler, system.Namespace()) assert.Nil(t, err) + if err := cmw.Start(ctx.Done()); err != nil { + logger.Fatal("Failed to start configmap watcher", zap.Error(err)) + } + // Note that this initial change to the watched map is not part of the default Kubernetes watching logic. // The underlying KNative InformedWatcher Start() function that is called as part of the // SetupConfigMapWatchOrDie code "pretends" that all watched resources were just created. diff --git a/pkg/channel/distributed/common/config/testing/util.go b/pkg/common/config/testing/util.go similarity index 92% rename from pkg/channel/distributed/common/config/testing/util.go rename to pkg/common/config/testing/util.go index fddca255a2..ed3eb0a0bc 100644 --- a/pkg/channel/distributed/common/config/testing/util.go +++ b/pkg/common/config/testing/util.go @@ -22,40 +22,13 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kafkaconstants "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/constants" + clienttesting "knative.dev/eventing-kafka/pkg/common/client/testing" "knative.dev/eventing-kafka/pkg/common/constants" "knative.dev/pkg/system" ) // Constants const ( - DefaultSaramaConfigYaml = ` -Version: 2.0.0 -Admin: - Timeout: 10000000000 -Net: - KeepAlive: 30000000000 - MaxOpenRequests: 1 - TLS: - Enable: true - SASL: - Enable: true - Mechanism: PLAIN - Version: 1 -Metadata: - RefreshFrequency: 300000000000 -Consumer: - Offsets: - AutoCommit: - Interval: 5000000000 - Retention: 604800000000000 - Return: - Errors: true -Producer: - Idempotent: true - RequiredAcks: -1 - Return: - Successes: true -` DefaultEventingKafkaConfigYaml = ` receiver: cpuRequest: 100m @@ -98,7 +71,7 @@ func NewKafkaConfigMap(options ...KafkaConfigMapOption) *corev1.ConfigMap { Namespace: system.Namespace(), }, Data: map[string]string{ - constants.SaramaSettingsConfigKey: DefaultSaramaConfigYaml, + constants.SaramaSettingsConfigKey: clienttesting.DefaultSaramaConfigYaml, constants.EventingKafkaSettingsConfigKey: DefaultEventingKafkaConfigYaml, }, } diff --git a/pkg/channel/distributed/common/testing/constants.go b/pkg/common/testing/constants.go similarity index 100% rename from pkg/channel/distributed/common/testing/constants.go rename to pkg/common/testing/constants.go diff --git a/pkg/channel/distributed/common/testing/util.go b/pkg/common/testing/util.go similarity index 100% rename from pkg/channel/distributed/common/testing/util.go rename to pkg/common/testing/util.go