diff --git a/cmd/karmada-search/app/karmada-search.go b/cmd/karmada-search/app/karmada-search.go index 664ac16385ce..f35734138973 100644 --- a/cmd/karmada-search/app/karmada-search.go +++ b/cmd/karmada-search/app/karmada-search.go @@ -2,19 +2,46 @@ package app import ( "context" + "fmt" + "net" + "net/http" + "path" + "time" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/endpoints/openapi" + "k8s.io/apiserver/pkg/endpoints/request" + genericapiserver "k8s.io/apiserver/pkg/server" + genericfilters "k8s.io/apiserver/pkg/server/filters" + genericoptions "k8s.io/apiserver/pkg/server/options" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/term" + "k8s.io/klog/v2" + netutils "k8s.io/utils/net" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "github.com/karmada-io/karmada/cmd/karmada-search/app/options" + searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme" + karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" + informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" + generatedopenapi "github.com/karmada-io/karmada/pkg/generated/openapi" + "github.com/karmada-io/karmada/pkg/search" + "github.com/karmada-io/karmada/pkg/search/proxy" + "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime" "github.com/karmada-io/karmada/pkg/sharedcli" "github.com/karmada-io/karmada/pkg/sharedcli/klogflag" + "github.com/karmada-io/karmada/pkg/sharedcli/profileflag" + "github.com/karmada-io/karmada/pkg/util/lifted" + "github.com/karmada-io/karmada/pkg/version" "github.com/karmada-io/karmada/pkg/version/sharedcommand" ) +// Option configures a framework.Registry. +type Option func(*runtime.Registry) + // NewKarmadaSearchCommand creates a *cobra.Command object with default parameters -func NewKarmadaSearchCommand(ctx context.Context) *cobra.Command { +func NewKarmadaSearchCommand(ctx context.Context, registryOptions ...Option) *cobra.Command { opts := options.NewOptions() cmd := &cobra.Command{ @@ -28,7 +55,7 @@ capabilities such as global search and resource proxy in a multi-cloud environme if err := opts.Validate(); err != nil { return err } - if err := opts.Run(ctx); err != nil { + if err := run(ctx, opts, registryOptions...); err != nil { return err } return nil @@ -52,3 +79,144 @@ capabilities such as global search and resource proxy in a multi-cloud environme sharedcli.SetUsageAndHelpFunc(cmd, fss, cols) return cmd } + +// WithPlugin creates an Option based on plugin factory. +// Please don't remove this function: it is used to register out-of-tree plugins, +// hence there are no references to it from the karmada-search code base. +func WithPlugin(factory runtime.PluginFactory) Option { + return func(registry *runtime.Registry) { + registry.Register(factory) + } +} + +// `run` runs the karmada-search with options. This should never exit. +func run(ctx context.Context, o *options.Options, registryOptions ...Option) error { + klog.Infof("karmada-search version: %s", version.Get()) + + profileflag.ListenAndServe(o.ProfileOpts) + + config, err := config(o, registryOptions...) + if err != nil { + return err + } + + server, err := config.Complete().New() + if err != nil { + return err + } + + server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-informers", func(context genericapiserver.PostStartHookContext) error { + config.GenericConfig.SharedInformerFactory.Start(context.StopCh) + return nil + }) + + server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-informers", func(context genericapiserver.PostStartHookContext) error { + config.ExtraConfig.KarmadaSharedInformerFactory.Start(context.StopCh) + return nil + }) + + if config.ExtraConfig.Controller != nil { + server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error { + // start ResourceRegistry controller + config.ExtraConfig.Controller.Start(context.StopCh) + return nil + }) + } + + if config.ExtraConfig.ProxyController != nil { + server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-proxy-controller", func(context genericapiserver.PostStartHookContext) error { + config.ExtraConfig.ProxyController.Start(context.StopCh) + return nil + }) + + server.GenericAPIServer.AddPreShutdownHookOrDie("stop-karmada-proxy-controller", func() error { + config.ExtraConfig.ProxyController.Stop() + return nil + }) + } + + return server.GenericAPIServer.PrepareRun().Run(ctx.Done()) +} + +// `config` returns config for the api server given Options +func config(o *options.Options, outOfTreeRegistryOptions ...Option) (*search.Config, error) { + // TODO have a "real" external address + if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{netutils.ParseIPSloppy("127.0.0.1")}); err != nil { + return nil, fmt.Errorf("error creating self-signed certificates: %v", err) + } + + o.RecommendedOptions.Features = &genericoptions.FeatureOptions{EnableProfiling: false} + + serverConfig := genericapiserver.NewRecommendedConfig(searchscheme.Codecs) + serverConfig.LongRunningFunc = customLongRunningRequestCheck( + sets.NewString("watch", "proxy"), + sets.NewString("attach", "exec", "proxy", "log", "portforward")) + serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(searchscheme.Scheme)) + serverConfig.OpenAPIConfig.Info.Title = "karmada-search" + if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil { + return nil, err + } + + serverConfig.ClientConfig.QPS = o.KubeAPIQPS + serverConfig.ClientConfig.Burst = o.KubeAPIBurst + + restMapper, err := apiutil.NewDynamicRESTMapper(serverConfig.ClientConfig) + if err != nil { + klog.Errorf("Failed to create REST mapper: %v", err) + return nil, err + } + + karmadaClient := karmadaclientset.NewForConfigOrDie(serverConfig.ClientConfig) + factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0) + + var ctl *search.Controller + if !o.DisableSearch { + ctl, err = search.NewController(serverConfig.ClientConfig, factory, restMapper) + if err != nil { + return nil, err + } + } + + var proxyCtl *proxy.Controller + if !o.DisableProxy { + outOfTreeRegistry := make(runtime.Registry, 0, len(outOfTreeRegistryOptions)) + for _, option := range outOfTreeRegistryOptions { + option(&outOfTreeRegistry) + } + + proxyCtl, err = proxy.NewController(proxy.NewControllerOption{ + RestConfig: serverConfig.ClientConfig, + RestMapper: restMapper, + KubeFactory: serverConfig.SharedInformerFactory, + KarmadaFactory: factory, + MinRequestTimeout: time.Second * time.Duration(serverConfig.Config.MinRequestTimeout), + OutOfTreeRegistry: outOfTreeRegistry, + }) + + if err != nil { + return nil, err + } + } + + config := &search.Config{ + GenericConfig: serverConfig, + ExtraConfig: search.ExtraConfig{ + KarmadaSharedInformerFactory: factory, + Controller: ctl, + ProxyController: proxyCtl, + }, + } + return config, nil +} + +func customLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) request.LongRunningRequestCheck { + return func(r *http.Request, requestInfo *request.RequestInfo) bool { + if requestInfo.APIGroup == "search.karmada.io" && requestInfo.Resource == "proxying" { + reqClone := r.Clone(context.TODO()) + // requestInfo.Parts is like [proxying foo proxy api v1 nodes] + reqClone.URL.Path = "/" + path.Join(requestInfo.Parts[3:]...) + requestInfo = lifted.NewRequestInfo(reqClone) + } + return genericfilters.BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources)(r, requestInfo) + } +} diff --git a/cmd/karmada-search/app/options/options.go b/cmd/karmada-search/app/options/options.go index 61c0f7912e97..f744b8503895 100644 --- a/cmd/karmada-search/app/options/options.go +++ b/cmd/karmada-search/app/options/options.go @@ -1,43 +1,21 @@ package options import ( - "context" - "fmt" - "net" - "net/http" - "path" - "time" - "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apiserver/pkg/endpoints/openapi" - "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" - genericapiserver "k8s.io/apiserver/pkg/server" - genericfilters "k8s.io/apiserver/pkg/server/filters" genericoptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/klog/v2" - netutils "k8s.io/utils/net" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme" searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1" - karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" - informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" - generatedopenapi "github.com/karmada-io/karmada/pkg/generated/openapi" - "github.com/karmada-io/karmada/pkg/search" - "github.com/karmada-io/karmada/pkg/search/proxy" "github.com/karmada-io/karmada/pkg/sharedcli/profileflag" - "github.com/karmada-io/karmada/pkg/util/lifted" - "github.com/karmada-io/karmada/pkg/version" ) const defaultEtcdPathPrefix = "/registry" -// Options contains everything necessary to create and run karmada-search. +// Options contains command line parameters for karmada-search. type Options struct { RecommendedOptions *genericoptions.RecommendedOptions @@ -83,123 +61,3 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { func (o *Options) Complete() error { return nil } - -// Run runs the aggregated-apiserver with options. This should never exit. -func (o *Options) Run(ctx context.Context) error { - klog.Infof("karmada-search version: %s", version.Get()) - - profileflag.ListenAndServe(o.ProfileOpts) - - config, err := o.Config() - if err != nil { - return err - } - - server, err := config.Complete().New() - if err != nil { - return err - } - - server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-informers", func(context genericapiserver.PostStartHookContext) error { - config.GenericConfig.SharedInformerFactory.Start(context.StopCh) - return nil - }) - - server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-informers", func(context genericapiserver.PostStartHookContext) error { - config.ExtraConfig.KarmadaSharedInformerFactory.Start(context.StopCh) - return nil - }) - - if config.ExtraConfig.Controller != nil { - server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error { - // start ResourceRegistry controller - config.ExtraConfig.Controller.Start(context.StopCh) - return nil - }) - } - - if config.ExtraConfig.ProxyController != nil { - server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-proxy-controller", func(context genericapiserver.PostStartHookContext) error { - config.ExtraConfig.ProxyController.Start(context.StopCh) - return nil - }) - - server.GenericAPIServer.AddPreShutdownHookOrDie("stop-karmada-proxy-controller", func() error { - config.ExtraConfig.ProxyController.Stop() - return nil - }) - } - - return server.GenericAPIServer.PrepareRun().Run(ctx.Done()) -} - -// Config returns config for the api server given Options -func (o *Options) Config() (*search.Config, error) { - // TODO have a "real" external address - if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{netutils.ParseIPSloppy("127.0.0.1")}); err != nil { - return nil, fmt.Errorf("error creating self-signed certificates: %v", err) - } - - o.RecommendedOptions.Features = &genericoptions.FeatureOptions{EnableProfiling: false} - - serverConfig := genericapiserver.NewRecommendedConfig(searchscheme.Codecs) - serverConfig.LongRunningFunc = customLongRunningRequestCheck( - sets.NewString("watch", "proxy"), - sets.NewString("attach", "exec", "proxy", "log", "portforward")) - serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(searchscheme.Scheme)) - serverConfig.OpenAPIConfig.Info.Title = "karmada-search" - if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil { - return nil, err - } - - serverConfig.ClientConfig.QPS = o.KubeAPIQPS - serverConfig.ClientConfig.Burst = o.KubeAPIBurst - - restMapper, err := apiutil.NewDynamicRESTMapper(serverConfig.ClientConfig) - if err != nil { - klog.Errorf("Failed to create REST mapper: %v", err) - return nil, err - } - - karmadaClient := karmadaclientset.NewForConfigOrDie(serverConfig.ClientConfig) - factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0) - - var ctl *search.Controller - if !o.DisableSearch { - ctl, err = search.NewController(serverConfig.ClientConfig, factory, restMapper) - if err != nil { - return nil, err - } - } - - var proxyCtl *proxy.Controller - if !o.DisableProxy { - proxyCtl, err = proxy.NewController(serverConfig.ClientConfig, restMapper, serverConfig.SharedInformerFactory, factory, - time.Second*time.Duration(serverConfig.Config.MinRequestTimeout)) - if err != nil { - return nil, err - } - } - - config := &search.Config{ - GenericConfig: serverConfig, - ExtraConfig: search.ExtraConfig{ - KarmadaSharedInformerFactory: factory, - Controller: ctl, - ProxyController: proxyCtl, - }, - } - return config, nil -} - -func customLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) request.LongRunningRequestCheck { - return func(r *http.Request, requestInfo *request.RequestInfo) bool { - if requestInfo.APIGroup == "search.karmada.io" && requestInfo.Resource == "proxying" { - reqClone := r.Clone(context.TODO()) - // requestInfo.Parts is like [proxying foo proxy api v1 nodes] - reqClone.URL.Path = "/" + path.Join(requestInfo.Parts[3:]...) - requestInfo = lifted.NewRequestInfo(reqClone) - } - return genericfilters.BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources)(r, requestInfo) - } -} diff --git a/pkg/search/proxy/client_factory.go b/pkg/search/proxy/client_factory.go new file mode 100644 index 000000000000..b27987060ac2 --- /dev/null +++ b/pkg/search/proxy/client_factory.go @@ -0,0 +1,84 @@ +package proxy + +import ( + "fmt" + "net/http" + "net/url" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/dynamic" + listcorev1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" + "github.com/karmada-io/karmada/pkg/search/proxy/store" +) + +func newMultiClusterStore(clusterLister clusterlisters.ClusterLister, + secretLister listcorev1.SecretLister, restMapper meta.RESTMapper) store.Store { + clientFactory := &clientFactory{ + ClusterLister: clusterLister, + SecretLister: secretLister, + } + + return store.NewMultiClusterCache(clientFactory.DynamicClientForCluster, restMapper) +} + +type clientFactory struct { + ClusterLister clusterlisters.ClusterLister + SecretLister listcorev1.SecretLister +} + +// DynamicClientForCluster creates a dynamic client for required cluster. +// TODO: reuse with karmada/pkg/util/membercluster_client.go +func (factory *clientFactory) DynamicClientForCluster(clusterName string) (dynamic.Interface, error) { + cluster, err := factory.ClusterLister.Get(clusterName) + if err != nil { + return nil, err + } + + apiEndpoint := cluster.Spec.APIEndpoint + if apiEndpoint == "" { + return nil, fmt.Errorf("the api endpoint of cluster %s is empty", clusterName) + } + + if cluster.Spec.SecretRef == nil { + return nil, fmt.Errorf("cluster %s does not have a secret", clusterName) + } + + secret, err := factory.SecretLister.Secrets(cluster.Spec.SecretRef.Namespace).Get(cluster.Spec.SecretRef.Name) + if err != nil { + return nil, err + } + + token, tokenFound := secret.Data[clusterv1alpha1.SecretTokenKey] + if !tokenFound || len(token) == 0 { + return nil, fmt.Errorf("the secret for cluster %s is missing a non-empty value for %q", clusterName, clusterv1alpha1.SecretTokenKey) + } + + clusterConfig, err := clientcmd.BuildConfigFromFlags(apiEndpoint, "") + if err != nil { + return nil, err + } + + clusterConfig.BearerToken = string(token) + + if cluster.Spec.InsecureSkipTLSVerification { + clusterConfig.TLSClientConfig.Insecure = true + } else { + clusterConfig.CAData = secret.Data[clusterv1alpha1.SecretCADataKey] + } + + if cluster.Spec.ProxyURL != "" { + proxy, err := url.Parse(cluster.Spec.ProxyURL) + if err != nil { + klog.Errorf("parse proxy error. %v", err) + return nil, err + } + clusterConfig.Proxy = http.ProxyURL(proxy) + } + + return dynamic.NewForConfig(clusterConfig) +} diff --git a/pkg/search/proxy/client_factory_test.go b/pkg/search/proxy/client_factory_test.go new file mode 100644 index 000000000000..7de24f450863 --- /dev/null +++ b/pkg/search/proxy/client_factory_test.go @@ -0,0 +1,246 @@ +package proxy + +import ( + "errors" + "testing" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake" + karmadainformers "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" + proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing" +) + +func TestClientFactory_dynamicClientForCluster(t *testing.T) { + // copy from go/src/net/http/internal/testcert/testcert.go + testCA := []byte(`-----BEGIN CERTIFICATE----- +MIIDOTCCAiGgAwIBAgIQSRJrEpBGFc7tNb1fb5pKFzANBgkqhkiG9w0BAQsFADAS +MRAwDgYDVQQKEwdBY21lIENvMCAXDTcwMDEwMTAwMDAwMFoYDzIwODQwMTI5MTYw +MDAwWjASMRAwDgYDVQQKEwdBY21lIENvMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEA6Gba5tHV1dAKouAaXO3/ebDUU4rvwCUg/CNaJ2PT5xLD4N1Vcb8r +bFSW2HXKq+MPfVdwIKR/1DczEoAGf/JWQTW7EgzlXrCd3rlajEX2D73faWJekD0U +aUgz5vtrTXZ90BQL7WvRICd7FlEZ6FPOcPlumiyNmzUqtwGhO+9ad1W5BqJaRI6P +YfouNkwR6Na4TzSj5BrqUfP0FwDizKSJ0XXmh8g8G9mtwxOSN3Ru1QFc61Xyeluk +POGKBV/q6RBNklTNe0gI8usUMlYyoC7ytppNMW7X2vodAelSu25jgx2anj9fDVZu +h7AXF5+4nJS4AAt0n1lNY7nGSsdZas8PbQIDAQABo4GIMIGFMA4GA1UdDwEB/wQE +AwICpDATBgNVHSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MB0GA1Ud +DgQWBBStsdjh3/JCXXYlQryOrL4Sh7BW5TAuBgNVHREEJzAlggtleGFtcGxlLmNv +bYcEfwAAAYcQAAAAAAAAAAAAAAAAAAAAATANBgkqhkiG9w0BAQsFAAOCAQEAxWGI +5NhpF3nwwy/4yB4i/CwwSpLrWUa70NyhvprUBC50PxiXav1TeDzwzLx/o5HyNwsv +cxv3HdkLW59i/0SlJSrNnWdfZ19oTcS+6PtLoVyISgtyN6DpkKpdG1cOkW3Cy2P2 ++tK/tKHRP1Y/Ra0RiDpOAmqn0gCOFGz8+lqDIor/T7MTpibL3IxqWfPrvfVRHL3B +grw/ZQTTIVjjh4JBSW3WyWgNo/ikC1lrVxzl4iPUGptxT36Cr7Zk2Bsg0XqwbOvK +5d+NTDREkSnUbie4GeutujmX3Dsx88UiV6UY/4lHJa6I5leHUNOHahRbpbWeOfs/ +WkBKOclmOV2xlTVuPw== +-----END CERTIFICATE-----`) + + type args struct { + clusters []runtime.Object + secrets []runtime.Object + } + + type want struct { + err error + } + + tests := []struct { + name string + args args + want want + }{ + { + name: "cluster not found", + args: args{ + clusters: nil, + secrets: nil, + }, + want: want{ + err: apierrors.NewNotFound(schema.GroupResource{Resource: "cluster", Group: "cluster.karmada.io"}, "test"), + }, + }, + { + name: "api endpoint is empty", + args: args{ + clusters: []runtime.Object{&clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: clusterv1alpha1.ClusterSpec{}, + }}, + secrets: nil, + }, + want: want{ + err: errors.New("the api endpoint of cluster test is empty"), + }, + }, + { + name: "secret is empty", + args: args{ + clusters: []runtime.Object{&clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: clusterv1alpha1.ClusterSpec{ + APIEndpoint: "https://localhost", + }, + }}, + secrets: nil, + }, + want: want{ + err: errors.New("cluster test does not have a secret"), + }, + }, + { + name: "secret not found", + args: args{ + clusters: []runtime.Object{&clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: clusterv1alpha1.ClusterSpec{ + APIEndpoint: "https://localhost", + SecretRef: &clusterv1alpha1.LocalSecretReference{ + Namespace: "default", + Name: "test_secret", + }, + }, + }}, + secrets: nil, + }, + want: want{ + err: errors.New(`secret "test_secret" not found`), + }, + }, + { + name: "token not found", + args: args{ + clusters: []runtime.Object{&clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: clusterv1alpha1.ClusterSpec{ + APIEndpoint: "https://localhost", + SecretRef: &clusterv1alpha1.LocalSecretReference{ + Namespace: "default", + Name: "test_secret", + }, + }, + }}, + secrets: []runtime.Object{&corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test_secret", + }, + Data: map[string][]byte{}, + }}, + }, + want: want{ + err: errors.New(`the secret for cluster test is missing a non-empty value for "token"`), + }, + }, + { + name: "success", + args: args{ + clusters: []runtime.Object{&clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: clusterv1alpha1.ClusterSpec{ + APIEndpoint: "https://localhost", + SecretRef: &clusterv1alpha1.LocalSecretReference{ + Namespace: "default", + Name: "test_secret", + }, + }, + }}, + secrets: []runtime.Object{&corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test_secret", + }, + Data: map[string][]byte{ + clusterv1alpha1.SecretTokenKey: []byte("test_token"), + clusterv1alpha1.SecretCADataKey: testCA, + }, + }}, + }, + want: want{ + err: nil, + }, + }, + { + name: "has proxy", + args: args{ + clusters: []runtime.Object{&clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: clusterv1alpha1.ClusterSpec{ + APIEndpoint: "https://localhost", + SecretRef: &clusterv1alpha1.LocalSecretReference{ + Namespace: "default", + Name: "test_secret", + }, + ProxyURL: "https://localhost", + }, + }}, + secrets: []runtime.Object{&corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test_secret", + }, + Data: map[string][]byte{ + clusterv1alpha1.SecretTokenKey: []byte("test_token"), + clusterv1alpha1.SecretCADataKey: testCA, + }, + }}, + }, + want: want{ + err: nil, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + kubeClient := fake.NewSimpleClientset(tt.args.secrets...) + kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0) + + karmadaClient := karmadafake.NewSimpleClientset(tt.args.clusters...) + karmadaFactory := karmadainformers.NewSharedInformerFactory(karmadaClient, 0) + + factory := &clientFactory{ + ClusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(), + SecretLister: kubeFactory.Core().V1().Secrets().Lister(), + } + + stopCh := make(chan struct{}) + defer close(stopCh) + karmadaFactory.Start(stopCh) + karmadaFactory.WaitForCacheSync(stopCh) + kubeFactory.Start(stopCh) + kubeFactory.WaitForCacheSync(stopCh) + + client, err := factory.DynamicClientForCluster("test") + + if !proxytest.ErrorMessageEquals(err, tt.want.err) { + t.Errorf("got error %v, want %v", err, tt.want.err) + return + } + + if err != nil { + return + } + if client == nil { + t.Error("got client nil") + } + }) + } +} diff --git a/pkg/search/proxy/controller.go b/pkg/search/proxy/controller.go index 14e5c84f12a6..b86b1b02ef60 100644 --- a/pkg/search/proxy/controller.go +++ b/pkg/search/proxy/controller.go @@ -2,9 +2,7 @@ package proxy import ( "context" - "fmt" "net/http" - "net/url" "time" "k8s.io/apimachinery/pkg/api/meta" @@ -15,19 +13,19 @@ import ( "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" - "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/scheme" listcorev1 "k8s.io/client-go/listers/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" - clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" searchlisters "github.com/karmada-io/karmada/pkg/generated/listers/search/v1alpha1" + "github.com/karmada-io/karmada/pkg/search/proxy/framework" + "github.com/karmada-io/karmada/pkg/search/proxy/framework/plugins" + pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime" "github.com/karmada-io/karmada/pkg/search/proxy/store" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/lifted" @@ -36,10 +34,6 @@ import ( const workKey = "key" -type connector interface { - connect(ctx context.Context, gvr schema.GroupVersionResource, proxyPath string, responder rest.Responder) (http.Handler, error) -} - // Controller syncs Cluster and GlobalResource. type Controller struct { restMapper meta.RESTMapper @@ -49,36 +43,47 @@ type Controller struct { clusterLister clusterlisters.ClusterLister registryLister searchlisters.ResourceRegistryLister worker util.AsyncWorker - store store.Cache + store store.Store - // proxy - karmadaProxy connector - clusterProxy connector - cacheProxy connector + proxy framework.Proxy +} + +// NewControllerOption is the Option for NewController(). +type NewControllerOption struct { + RestConfig *restclient.Config + RestMapper meta.RESTMapper + + KubeFactory informers.SharedInformerFactory + KarmadaFactory informerfactory.SharedInformerFactory + + MinRequestTimeout time.Duration + + OutOfTreeRegistry pluginruntime.Registry } // NewController create a controller for proxy -func NewController(restConfig *restclient.Config, restMapper meta.RESTMapper, - kubeFactory informers.SharedInformerFactory, karmadaFactory informerfactory.SharedInformerFactory, - minRequestTimeout time.Duration) (*Controller, error) { - kp, err := newKarmadaProxy(restConfig) +func NewController(option NewControllerOption) (*Controller, error) { + secretLister := option.KubeFactory.Core().V1().Secrets().Lister() + clusterLister := option.KarmadaFactory.Cluster().V1alpha1().Clusters().Lister() + + multiClusterStore := newMultiClusterStore(clusterLister, secretLister, option.RestMapper) + + allPlugins, err := newPlugins(option, multiClusterStore) if err != nil { return nil, err } + proxy := pluginruntime.NewFramework(allPlugins) + ctl := &Controller{ - restMapper: restMapper, + restMapper: option.RestMapper, negotiatedSerializer: scheme.Codecs.WithoutConversion(), - secretLister: kubeFactory.Core().V1().Secrets().Lister(), - clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(), - registryLister: karmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(), + secretLister: secretLister, + clusterLister: clusterLister, + registryLister: option.KarmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(), + store: multiClusterStore, + proxy: proxy, } - s := store.NewMultiClusterCache(ctl.dynamicClientForCluster, restMapper) - - ctl.store = s - ctl.cacheProxy = newCacheProxy(s, restMapper, minRequestTimeout) - ctl.clusterProxy = newClusterProxy(s, ctl.clusterLister, ctl.secretLister) - ctl.karmadaProxy = kp workerOptions := util.Options{ Name: "proxy-controller", @@ -99,12 +104,38 @@ func NewController(restConfig *restclient.Config, restMapper meta.RESTMapper, }, } - karmadaFactory.Cluster().V1alpha1().Clusters().Informer().AddEventHandler(resourceEventHandler) - karmadaFactory.Search().V1alpha1().ResourceRegistries().Informer().AddEventHandler(resourceEventHandler) + option.KarmadaFactory.Cluster().V1alpha1().Clusters().Informer().AddEventHandler(resourceEventHandler) + option.KarmadaFactory.Search().V1alpha1().ResourceRegistries().Informer().AddEventHandler(resourceEventHandler) return ctl, nil } +func newPlugins(option NewControllerOption, clusterStore store.Store) ([]framework.Plugin, error) { + pluginDependency := pluginruntime.PluginDependency{ + RestConfig: option.RestConfig, + RestMapper: option.RestMapper, + KubeFactory: option.KubeFactory, + KarmadaFactory: option.KarmadaFactory, + MinRequestTimeout: option.MinRequestTimeout, + Store: clusterStore, + } + + registry := plugins.NewInTreeRegistry() + registry.Merge(option.OutOfTreeRegistry) + + allPlugins := make([]framework.Plugin, 0, len(registry)) + for _, pluginFactory := range registry { + plugin, err := pluginFactory(pluginDependency) + if err != nil { + return nil, err + } + + allPlugins = append(allPlugins, plugin) + } + + return allPlugins, nil +} + // Start run the proxy controller func (ctl *Controller) Start(stopCh <-chan struct{}) { ctl.worker.Run(1, stopCh) @@ -160,6 +191,21 @@ func (ctl *Controller) reconcile(util.QueueKey) error { return ctl.store.UpdateCache(resourcesByClusters) } +type errorHTTPHandler struct { + requestInfo *request.RequestInfo + err error + negotiatedSerializer runtime.NegotiatedSerializer +} + +func (handler *errorHTTPHandler) ServeHTTP(delegate http.ResponseWriter, req *http.Request) { + // Write error into delegate ResponseWriter, wrapped in metrics.InstrumentHandlerFunc, so metrics can record this error. + gv := schema.GroupVersion{ + Group: handler.requestInfo.APIGroup, + Version: handler.requestInfo.Verb, + } + responsewriters.ErrorNegotiated(handler.err, handler.negotiatedSerializer, gv, delegate, req) +} + // Connect proxy and dispatch handlers func (ctl *Controller) Connect(ctx context.Context, proxyPath string, responder rest.Responder) (http.Handler, error) { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { @@ -177,106 +223,24 @@ func (ctl *Controller) Connect(ctx context.Context, proxyPath string, responder Resource: requestInfo.Resource, } - conn := ctl.connect(requestInfo) - h, err := conn.connect(newCtx, gvr, proxyPath, responder) + h, err := ctl.proxy.Connect(newCtx, framework.ProxyRequest{ + RequestInfo: requestInfo, + GroupVersionResource: gvr, + ProxyPath: proxyPath, + Responder: responder, + HTTPReq: newReq, + }) + if err != nil { - h = http.HandlerFunc(func(delegate http.ResponseWriter, req *http.Request) { - // Write error into delegate ResponseWriter, wrapped in metrics.InstrumentHandlerFunc, so metrics can record this error. - gv := schema.GroupVersion{ - Group: requestInfo.APIGroup, - Version: requestInfo.Verb, - } - responsewriters.ErrorNegotiated(err, ctl.negotiatedSerializer, gv, delegate, req) - }) + h = &errorHTTPHandler{ + requestInfo: requestInfo, + err: err, + negotiatedSerializer: ctl.negotiatedSerializer, + } } + h = metrics.InstrumentHandlerFunc(requestInfo.Verb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, "", "karmada-search", false, "", h.ServeHTTP) h.ServeHTTP(rw, newReq) }), nil } - -func (ctl *Controller) connect(requestInfo *request.RequestInfo) connector { - gvr := schema.GroupVersionResource{ - Group: requestInfo.APIGroup, - Version: requestInfo.APIVersion, - Resource: requestInfo.Resource, - } - - // requests will be redirected to: - // 1. karmada apiserver - // 2. cache - // 3. member clusters - // see more information from https://github.com/karmada-io/karmada/tree/master/docs/proposals/resource-aggregation-proxy#request-routing - - // 1. For non-resource requests, or resources are not defined in ResourceRegistry, - // we redirect the requests to karmada apiserver. - // Usually the request are - // - api index, e.g.: `/api`, `/apis` - // - to workload created in karmada controller panel, such as deployments and services. - if !requestInfo.IsResourceRequest || !ctl.store.HasResource(gvr) { - return ctl.karmadaProxy - } - - // 2. For reading requests, we redirect them to cache. - // Users call these requests to read resources in member clusters, such as pods and nodes. - if requestInfo.Subresource == "" && (requestInfo.Verb == "get" || requestInfo.Verb == "list" || requestInfo.Verb == "watch") { - return ctl.cacheProxy - } - - // 3. The remaining requests are: - // - writing resources. - // - or subresource requests, e.g. `pods/log` - // We firstly find the resource from cache, and get the located cluster. Then redirect the request to the cluster. - return ctl.clusterProxy -} - -// TODO: reuse with karmada/pkg/util/membercluster_client.go -func (ctl *Controller) dynamicClientForCluster(clusterName string) (dynamic.Interface, error) { - cluster, err := ctl.clusterLister.Get(clusterName) - if err != nil { - return nil, err - } - - apiEndpoint := cluster.Spec.APIEndpoint - if apiEndpoint == "" { - return nil, fmt.Errorf("the api endpoint of cluster %s is empty", clusterName) - } - - if cluster.Spec.SecretRef == nil { - return nil, fmt.Errorf("cluster %s does not have a secret", clusterName) - } - - secret, err := ctl.secretLister.Secrets(cluster.Spec.SecretRef.Namespace).Get(cluster.Spec.SecretRef.Name) - if err != nil { - return nil, err - } - - token, tokenFound := secret.Data[clusterv1alpha1.SecretTokenKey] - if !tokenFound || len(token) == 0 { - return nil, fmt.Errorf("the secret for cluster %s is missing a non-empty value for %q", clusterName, clusterv1alpha1.SecretTokenKey) - } - - clusterConfig, err := clientcmd.BuildConfigFromFlags(apiEndpoint, "") - if err != nil { - return nil, err - } - - clusterConfig.BearerToken = string(token) - - if cluster.Spec.InsecureSkipTLSVerification { - clusterConfig.TLSClientConfig.Insecure = true - } else { - clusterConfig.CAData = secret.Data[clusterv1alpha1.SecretCADataKey] - } - - if cluster.Spec.ProxyURL != "" { - proxy, err := url.Parse(cluster.Spec.ProxyURL) - if err != nil { - klog.Errorf("parse proxy error. %v", err) - return nil, err - } - clusterConfig.Proxy = http.ProxyURL(proxy) - } - - return dynamic.NewForConfig(clusterConfig) -} diff --git a/pkg/search/proxy/controller_test.go b/pkg/search/proxy/controller_test.go index f390bdf0aa49..720441ca0704 100644 --- a/pkg/search/proxy/controller_test.go +++ b/pkg/search/proxy/controller_test.go @@ -2,7 +2,6 @@ package proxy import ( "context" - "errors" "fmt" "net/http" "net/http/httptest" @@ -13,14 +12,10 @@ import ( "time" "github.com/google/go-cmp/cmp" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" - "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" @@ -31,24 +26,11 @@ import ( searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1" karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake" karmadainformers "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" - "github.com/karmada-io/karmada/pkg/search/proxy/store" + "github.com/karmada-io/karmada/pkg/search/proxy/framework" + pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime" + proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing" ) -var ( - podGVK = corev1.SchemeGroupVersion.WithKind("Pod") - nodeGVK = corev1.SchemeGroupVersion.WithKind("Node") - - podSelector = searchv1alpha1.ResourceSelector{APIVersion: podGVK.GroupVersion().String(), Kind: podGVK.Kind} - nodeSelector = searchv1alpha1.ResourceSelector{APIVersion: nodeGVK.GroupVersion().String(), Kind: nodeGVK.Kind} - - restMapper = meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion}) -) - -func init() { - restMapper.Add(podGVK, meta.RESTScopeNamespace) - restMapper.Add(nodeGVK, meta.RESTScopeRoot) -} - func TestController(t *testing.T) { restConfig := &restclient.Config{ Host: "https://localhost:6443", @@ -59,7 +41,7 @@ func TestController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "rr"}, Spec: searchv1alpha1.ResourceRegistrySpec{ ResourceSelectors: []searchv1alpha1.ResourceSelector{ - podSelector, + proxytest.PodSelector, }, }, } @@ -67,13 +49,14 @@ func TestController(t *testing.T) { kubeFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) karmadaFactory := karmadainformers.NewSharedInformerFactory(karmadafake.NewSimpleClientset(cluster1, rr), 0) - ctrl, err := NewController( - restConfig, - restMapper, - kubeFactory, - karmadaFactory, - 0, - ) + ctrl, err := NewController(NewControllerOption{ + RestConfig: restConfig, + RestMapper: proxytest.RestMapper, + KubeFactory: kubeFactory, + KarmadaFactory: karmadaFactory, + MinRequestTimeout: 0, + }) + if err != nil { t.Error(err) return @@ -95,7 +78,7 @@ func TestController(t *testing.T) { // wait for controller synced time.Sleep(time.Second) - hasPod := ctrl.store.HasResource(podGVR) + hasPod := ctrl.store.HasResource(proxytest.PodGVR) if !hasPod { t.Error("has no pod resource") return @@ -127,7 +110,7 @@ func TestController_reconcile(t *testing.T) { ClusterNames: []string{"cluster1"}, }, ResourceSelectors: []searchv1alpha1.ResourceSelector{ - podSelector, + proxytest.PodSelector, }, }, }, @@ -146,8 +129,8 @@ func TestController_reconcile(t *testing.T) { ClusterNames: []string{"cluster1", "cluster2"}, }, ResourceSelectors: []searchv1alpha1.ResourceSelector{ - podSelector, - nodeSelector, + proxytest.PodSelector, + proxytest.NodeSelector, }, }, }, @@ -166,14 +149,14 @@ func TestController_reconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "rr1"}, Spec: searchv1alpha1.ResourceRegistrySpec{ TargetCluster: policyv1alpha1.ClusterAffinity{ClusterNames: []string{"cluster1"}}, - ResourceSelectors: []searchv1alpha1.ResourceSelector{podSelector}, + ResourceSelectors: []searchv1alpha1.ResourceSelector{proxytest.PodSelector}, }, }, &searchv1alpha1.ResourceRegistry{ ObjectMeta: metav1.ObjectMeta{Name: "rr2"}, Spec: searchv1alpha1.ResourceRegistrySpec{ TargetCluster: policyv1alpha1.ClusterAffinity{ClusterNames: []string{"cluster2"}}, - ResourceSelectors: []searchv1alpha1.ResourceSelector{nodeSelector}, + ResourceSelectors: []searchv1alpha1.ResourceSelector{proxytest.NodeSelector}, }, }, }, @@ -191,14 +174,14 @@ func TestController_reconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "rr1"}, Spec: searchv1alpha1.ResourceRegistrySpec{ TargetCluster: policyv1alpha1.ClusterAffinity{ClusterNames: []string{"cluster1"}}, - ResourceSelectors: []searchv1alpha1.ResourceSelector{podSelector, nodeSelector}, + ResourceSelectors: []searchv1alpha1.ResourceSelector{proxytest.PodSelector, proxytest.NodeSelector}, }, }, &searchv1alpha1.ResourceRegistry{ ObjectMeta: metav1.ObjectMeta{Name: "rr2"}, Spec: searchv1alpha1.ResourceRegistrySpec{ TargetCluster: policyv1alpha1.ClusterAffinity{ClusterNames: []string{"cluster2"}}, - ResourceSelectors: []searchv1alpha1.ResourceSelector{nodeSelector}, + ResourceSelectors: []searchv1alpha1.ResourceSelector{proxytest.NodeSelector}, }, }, }, @@ -219,8 +202,8 @@ func TestController_reconcile(t *testing.T) { ClusterNames: []string{"cluster1"}, }, ResourceSelectors: []searchv1alpha1.ResourceSelector{ - podSelector, - podSelector, + proxytest.PodSelector, + proxytest.PodSelector, }, }, }, @@ -241,7 +224,7 @@ func TestController_reconcile(t *testing.T) { ClusterNames: []string{"cluster1"}, }, ResourceSelectors: []searchv1alpha1.ResourceSelector{ - podSelector, + proxytest.PodSelector, }, }, }, @@ -252,7 +235,7 @@ func TestController_reconcile(t *testing.T) { ClusterNames: []string{"cluster1"}, }, ResourceSelectors: []searchv1alpha1.ResourceSelector{ - podSelector, + proxytest.PodSelector, }, }, }, @@ -288,10 +271,10 @@ func TestController_reconcile(t *testing.T) { karmadaFactory := karmadainformers.NewSharedInformerFactory(karmadaClientset, 0) ctl := &Controller{ - restMapper: restMapper, + restMapper: proxytest.RestMapper, clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(), registryLister: karmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(), - store: &cacheFuncs{ + store: &proxytest.MockStore{ UpdateCacheFunc: func(m map[string]map[schema.GroupVersionResource]struct{}) error { for clusterName, resources := range m { resourceNames := make([]string, 0, len(resources)) @@ -324,148 +307,165 @@ func TestController_reconcile(t *testing.T) { } } -func TestController_Connect(t *testing.T) { - var karmadaProxying, clusterProxying, cacheProxying bool - ctl := &Controller{ - karmadaProxy: connectFunc(func(context.Context, schema.GroupVersionResource, string, rest.Responder) (http.Handler, error) { - return http.HandlerFunc(func(http.ResponseWriter, *http.Request) { - karmadaProxying = true - }), nil - }), - cacheProxy: connectFunc(func(context.Context, schema.GroupVersionResource, string, rest.Responder) (http.Handler, error) { - return http.HandlerFunc(func(http.ResponseWriter, *http.Request) { - cacheProxying = true - }), nil - }), - clusterProxy: connectFunc(func(context.Context, schema.GroupVersionResource, string, rest.Responder) (http.Handler, error) { - return http.HandlerFunc(func(http.ResponseWriter, *http.Request) { - clusterProxying = true - }), nil - }), - store: &cacheFuncs{ - HasResourceFunc: func(gvr schema.GroupVersionResource) bool { return gvr == podGVR }, - }, - } +type mockPlugin struct { + TheOrder int + IsSupportRequest bool + Called bool +} - type args struct { - path string +var _ framework.Plugin = (*mockPlugin)(nil) + +func (r *mockPlugin) Order() int { + return r.TheOrder +} + +func (r *mockPlugin) SupportRequest(request framework.ProxyRequest) bool { + return r.IsSupportRequest +} + +func (r *mockPlugin) Connect(ctx context.Context, request framework.ProxyRequest) (http.Handler, error) { + return http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + r.Called = true + }), nil +} + +func convertPluginSlice(in []*mockPlugin) []framework.Plugin { + out := make([]framework.Plugin, 0, len(in)) + for _, plugin := range in { + out = append(out, plugin) } - type want struct { - karmadaProxying, clusterProxying, cacheProxying bool + + return out +} + +func TestController_Connect(t *testing.T) { + store := &proxytest.MockStore{ + HasResourceFunc: func(gvr schema.GroupVersionResource) bool { return gvr == proxytest.PodGVR }, } tests := []struct { - name string - args args - want want + name string + plugins []*mockPlugin + wantErr bool + wantCalled []bool }{ { - name: "get api from karmada", - args: args{ - path: "/api", - }, - want: want{ - karmadaProxying: true, - }, - }, - { - name: "get event api karmada", - args: args{ - path: "/apis/events.k8s.io/v1", - }, - want: want{ - karmadaProxying: true, - }, - }, - { - name: "list nodes from karmada", - args: args{ - path: "/api/v1/nodes", - }, - want: want{ - karmadaProxying: true, - }, - }, - { - name: "get node from karmada", - args: args{ - path: "/api/v1/nodes", - }, - want: want{ - karmadaProxying: true, - }, - }, - { - name: "list pod from cache", - args: args{ - path: "/api/v1/pods", - }, - want: want{ - cacheProxying: true, - }, - }, - { - name: "list pod from cache with namespace", - args: args{ - path: "/api/v1/namespaces/default/pods", - }, - want: want{ - cacheProxying: true, + name: "call first", + plugins: []*mockPlugin{ + { + TheOrder: 0, + IsSupportRequest: true, + }, + { + TheOrder: 1, + IsSupportRequest: true, + }, }, + wantErr: false, + wantCalled: []bool{true, false}, }, { - name: "get pod from cache", - args: args{ - path: "/api/v1/namespaces/default/pods/foo", - }, - want: want{ - cacheProxying: true, + name: "call second", + plugins: []*mockPlugin{ + { + TheOrder: 0, + IsSupportRequest: false, + }, + { + TheOrder: 1, + IsSupportRequest: true, + }, }, + wantErr: false, + wantCalled: []bool{false, true}, }, { - name: "get pod log from cluster", - args: args{ - path: "/api/v1/namespaces/default/pods/foo/log", - }, - want: want{ - clusterProxying: true, + name: "call fail", + plugins: []*mockPlugin{ + { + TheOrder: 0, + IsSupportRequest: false, + }, + { + TheOrder: 1, + IsSupportRequest: false, + }, }, + wantErr: true, + wantCalled: []bool{false, false}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - karmadaProxying, clusterProxying, cacheProxying = false, false, false - conn, err := ctl.Connect(context.TODO(), tt.args.path, nil) + ctl := &Controller{ + proxy: pluginruntime.NewFramework(convertPluginSlice(tt.plugins)), + negotiatedSerializer: scheme.Codecs.WithoutConversion(), + store: store, + } + + conn, err := ctl.Connect(context.TODO(), "/api/v1/pods", nil) if err != nil { - t.Error(err) - return + t.Fatal(err) } - req, err := http.NewRequest("GET", "/prefix"+tt.args.path, nil) + + req, err := http.NewRequest("GET", "/prefix/api/v1/pods", nil) if err != nil { - t.Error(err) - return + t.Fatal(err) } - conn.ServeHTTP(httptest.NewRecorder(), req) - if karmadaProxying != tt.want.karmadaProxying { - t.Errorf("karmadaProxying get = %v, want = %v", karmadaProxying, tt.want.karmadaProxying) + recorder := httptest.NewRecorder() + conn.ServeHTTP(recorder, req) + + response := recorder.Result() + fmt.Printf("response: %v", response) + + if (response.StatusCode != 200) != tt.wantErr { + t.Errorf("http request returned status code = %v, want error = %v", + response.StatusCode, tt.wantErr) } - if cacheProxying != tt.want.cacheProxying { - t.Errorf("cacheProxying get = %v, want = %v", cacheProxying, tt.want.cacheProxying) + + if len(tt.plugins) != len(tt.wantCalled) { + panic("len(tt.plugins) != len(tt.wantCalled), please fix test cases") } - if clusterProxying != tt.want.clusterProxying { - t.Errorf("clusterProxying get = %v, want = %v", clusterProxying, tt.want.clusterProxying) + + for i, n := 0, len(tt.plugins); i < n; i++ { + if tt.plugins[i].Called != tt.wantCalled[i] { + t.Errorf("plugin[%v].Called = %v, want = %v", i, tt.plugins[i].Called, tt.wantCalled[i]) + } } }) } } +type failPlugin struct{} + +var _ framework.Plugin = (*failPlugin)(nil) + +func (r *failPlugin) Order() int { + return 0 +} + +func (r *failPlugin) SupportRequest(request framework.ProxyRequest) bool { + return true +} + +func (r *failPlugin) Connect(ctx context.Context, request framework.ProxyRequest) (http.Handler, error) { + return nil, fmt.Errorf("test") +} + func TestController_Connect_Error(t *testing.T) { + store := &proxytest.MockStore{ + HasResourceFunc: func(gvr schema.GroupVersionResource) bool { + return gvr == proxytest.PodGVR + }, + } + + plugins := []framework.Plugin{&failPlugin{}} + ctl := &Controller{ - karmadaProxy: connectFunc(func(context.Context, schema.GroupVersionResource, string, rest.Responder) (http.Handler, error) { - return nil, fmt.Errorf("test") - }), + proxy: pluginruntime.NewFramework(plugins), + store: store, negotiatedSerializer: scheme.Codecs.WithoutConversion(), } @@ -491,287 +491,8 @@ func TestController_Connect_Error(t *testing.T) { } } -func TestController_dynamicClientForCluster(t *testing.T) { - // copy from go/src/net/http/internal/testcert/testcert.go - testCA := []byte(`-----BEGIN CERTIFICATE----- -MIIDOTCCAiGgAwIBAgIQSRJrEpBGFc7tNb1fb5pKFzANBgkqhkiG9w0BAQsFADAS -MRAwDgYDVQQKEwdBY21lIENvMCAXDTcwMDEwMTAwMDAwMFoYDzIwODQwMTI5MTYw -MDAwWjASMRAwDgYDVQQKEwdBY21lIENvMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A -MIIBCgKCAQEA6Gba5tHV1dAKouAaXO3/ebDUU4rvwCUg/CNaJ2PT5xLD4N1Vcb8r -bFSW2HXKq+MPfVdwIKR/1DczEoAGf/JWQTW7EgzlXrCd3rlajEX2D73faWJekD0U -aUgz5vtrTXZ90BQL7WvRICd7FlEZ6FPOcPlumiyNmzUqtwGhO+9ad1W5BqJaRI6P -YfouNkwR6Na4TzSj5BrqUfP0FwDizKSJ0XXmh8g8G9mtwxOSN3Ru1QFc61Xyeluk -POGKBV/q6RBNklTNe0gI8usUMlYyoC7ytppNMW7X2vodAelSu25jgx2anj9fDVZu -h7AXF5+4nJS4AAt0n1lNY7nGSsdZas8PbQIDAQABo4GIMIGFMA4GA1UdDwEB/wQE -AwICpDATBgNVHSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MB0GA1Ud -DgQWBBStsdjh3/JCXXYlQryOrL4Sh7BW5TAuBgNVHREEJzAlggtleGFtcGxlLmNv -bYcEfwAAAYcQAAAAAAAAAAAAAAAAAAAAATANBgkqhkiG9w0BAQsFAAOCAQEAxWGI -5NhpF3nwwy/4yB4i/CwwSpLrWUa70NyhvprUBC50PxiXav1TeDzwzLx/o5HyNwsv -cxv3HdkLW59i/0SlJSrNnWdfZ19oTcS+6PtLoVyISgtyN6DpkKpdG1cOkW3Cy2P2 -+tK/tKHRP1Y/Ra0RiDpOAmqn0gCOFGz8+lqDIor/T7MTpibL3IxqWfPrvfVRHL3B -grw/ZQTTIVjjh4JBSW3WyWgNo/ikC1lrVxzl4iPUGptxT36Cr7Zk2Bsg0XqwbOvK -5d+NTDREkSnUbie4GeutujmX3Dsx88UiV6UY/4lHJa6I5leHUNOHahRbpbWeOfs/ -WkBKOclmOV2xlTVuPw== ------END CERTIFICATE-----`) - - type args struct { - clusters []runtime.Object - secrets []runtime.Object - } - - type want struct { - err error - } - - tests := []struct { - name string - args args - want want - }{ - { - name: "cluster not found", - args: args{ - clusters: nil, - secrets: nil, - }, - want: want{ - err: apierrors.NewNotFound(schema.GroupResource{Resource: "cluster", Group: "cluster.karmada.io"}, "test"), - }, - }, - { - name: "api endpoint is empty", - args: args{ - clusters: []runtime.Object{&clusterv1alpha1.Cluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - Spec: clusterv1alpha1.ClusterSpec{}, - }}, - secrets: nil, - }, - want: want{ - err: errors.New("the api endpoint of cluster test is empty"), - }, - }, - { - name: "secret is empty", - args: args{ - clusters: []runtime.Object{&clusterv1alpha1.Cluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - Spec: clusterv1alpha1.ClusterSpec{ - APIEndpoint: "https://localhost", - }, - }}, - secrets: nil, - }, - want: want{ - err: errors.New("cluster test does not have a secret"), - }, - }, - { - name: "secret not found", - args: args{ - clusters: []runtime.Object{&clusterv1alpha1.Cluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - Spec: clusterv1alpha1.ClusterSpec{ - APIEndpoint: "https://localhost", - SecretRef: &clusterv1alpha1.LocalSecretReference{ - Namespace: "default", - Name: "test_secret", - }, - }, - }}, - secrets: nil, - }, - want: want{ - err: errors.New(`secret "test_secret" not found`), - }, - }, - { - name: "token not found", - args: args{ - clusters: []runtime.Object{&clusterv1alpha1.Cluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - Spec: clusterv1alpha1.ClusterSpec{ - APIEndpoint: "https://localhost", - SecretRef: &clusterv1alpha1.LocalSecretReference{ - Namespace: "default", - Name: "test_secret", - }, - }, - }}, - secrets: []runtime.Object{&corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "test_secret", - }, - Data: map[string][]byte{}, - }}, - }, - want: want{ - err: errors.New(`the secret for cluster test is missing a non-empty value for "token"`), - }, - }, - { - name: "success", - args: args{ - clusters: []runtime.Object{&clusterv1alpha1.Cluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - Spec: clusterv1alpha1.ClusterSpec{ - APIEndpoint: "https://localhost", - SecretRef: &clusterv1alpha1.LocalSecretReference{ - Namespace: "default", - Name: "test_secret", - }, - }, - }}, - secrets: []runtime.Object{&corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "test_secret", - }, - Data: map[string][]byte{ - clusterv1alpha1.SecretTokenKey: []byte("test_token"), - clusterv1alpha1.SecretCADataKey: testCA, - }, - }}, - }, - want: want{ - err: nil, - }, - }, - { - name: "has proxy", - args: args{ - clusters: []runtime.Object{&clusterv1alpha1.Cluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - Spec: clusterv1alpha1.ClusterSpec{ - APIEndpoint: "https://localhost", - SecretRef: &clusterv1alpha1.LocalSecretReference{ - Namespace: "default", - Name: "test_secret", - }, - ProxyURL: "https://localhost", - }, - }}, - secrets: []runtime.Object{&corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "test_secret", - }, - Data: map[string][]byte{ - clusterv1alpha1.SecretTokenKey: []byte("test_token"), - clusterv1alpha1.SecretCADataKey: testCA, - }, - }}, - }, - want: want{ - err: nil, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - kubeClient := fake.NewSimpleClientset(tt.args.secrets...) - kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0) - - karmadaClient := karmadafake.NewSimpleClientset(tt.args.clusters...) - karmadaFactory := karmadainformers.NewSharedInformerFactory(karmadaClient, 0) - - ctrl := &Controller{ - clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(), - secretLister: kubeFactory.Core().V1().Secrets().Lister(), - } - - stopCh := make(chan struct{}) - defer close(stopCh) - karmadaFactory.Start(stopCh) - karmadaFactory.WaitForCacheSync(stopCh) - kubeFactory.Start(stopCh) - kubeFactory.WaitForCacheSync(stopCh) - - client, err := ctrl.dynamicClientForCluster("test") - - if !errorEquals(err, tt.want.err) { - t.Errorf("got error %v, want %v", err, tt.want.err) - return - } - - if err != nil { - return - } - if client == nil { - t.Error("got client nil") - } - }) - } -} - -func errorEquals(a, b error) bool { - if a == b { - return true - } - if a == nil || b == nil { - return false - } - return a.Error() == b.Error() -} - -type connectFunc func(ctx context.Context, gvr schema.GroupVersionResource, proxyPath string, responder rest.Responder) (http.Handler, error) - -func (c connectFunc) connect(ctx context.Context, gvr schema.GroupVersionResource, proxyPath string, responder rest.Responder) (http.Handler, error) { - return c(ctx, gvr, proxyPath, responder) -} - func newCluster(name string) *clusterv1alpha1.Cluster { c := &clusterv1alpha1.Cluster{} c.Name = name return c } - -type cacheFuncs struct { - UpdateCacheFunc func(resourcesByCluster map[string]map[schema.GroupVersionResource]struct{}) error - HasResourceFunc func(resource schema.GroupVersionResource) bool - GetResourceFromCacheFunc func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) - StopFunc func() -} - -var _ store.Cache = &cacheFuncs{} - -func (c *cacheFuncs) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]struct{}) error { - if c.UpdateCacheFunc == nil { - panic("implement me") - } - return c.UpdateCacheFunc(resourcesByCluster) -} - -func (c *cacheFuncs) HasResource(resource schema.GroupVersionResource) bool { - if c.HasResourceFunc == nil { - panic("implement me") - } - return c.HasResourceFunc(resource) -} - -func (c *cacheFuncs) GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) { - if c.GetResourceFromCacheFunc == nil { - panic("implement me") - } - return c.GetResourceFromCacheFunc(ctx, gvr, namespace, name) -} - -func (c *cacheFuncs) Stop() { - if c.StopFunc != nil { - c.StopFunc() - } -} diff --git a/pkg/search/proxy/framework/interface.go b/pkg/search/proxy/framework/interface.go new file mode 100644 index 000000000000..7e03dd0f480e --- /dev/null +++ b/pkg/search/proxy/framework/interface.go @@ -0,0 +1,68 @@ +package framework + +import ( + "context" + "net/http" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" +) + +// Proxy connects to a backend +type Proxy interface { + // Connect returns a http.Handler to connect to backend. + Connect(ctx context.Context, request ProxyRequest) (http.Handler, error) +} + +// Plugin for the proxy plugin framework +type Plugin interface { + Proxy + + /* + * Order should return a constant int value. Smaller int value means higher priority. + * + * # Why order matters? + * + * The proxy plugin framework uses a variant of "Chain of Responsibility Pattern". + * There will be only one plugin selected. Smaller order value means this plugin has the chance to + * handle the request first. Only if the preceding plugin decide to not process the request, + * the next plugin can have the chance to process it. + * + * # Pattern Language Explanation + * + * "Chain of Responsibility Pattern" From "Design Patterns - Elements of Reusable Object-Oriented Software" + * + * Avoid coupling the sender of a request to its receiver by giving more than one object a chance + * to handle the request. Chain the receiving objects and pass the request along the chain until + * an object handles it. + * + * "Pipes and Filters Pattern" From "Pattern-Oriented Software Architecture Vol.1" + * + * The Pipes and Filters architectural pattern provides a structure for systems that process + * a stream of data. Each processing step is encapsulated in a filter component. Data is passed + * through pipes between adjacent filters. Recombining filters allows you to build families of + * related systems. + * + * Note the difference between "Chain of Responsibility Pattern" and "Pipes and Filters Pattern". + * Some other plugin frameworks in kubernetes use "Pipes and Filters Pattern", + * they will run multiple filters all together, so the order may not be as important as in + * "Chain of Responsibility Pattern" + */ + Order() int + + // SupportRequest returns true if this plugin support the request, false if not support. + // If this method return false, the request will skip this plugin. + SupportRequest(request ProxyRequest) bool +} + +// ProxyRequest holds parameter for Proxy.Connect() +type ProxyRequest struct { + RequestInfo *request.RequestInfo + GroupVersionResource schema.GroupVersionResource + ProxyPath string + + Responder rest.Responder + + HTTPReq *http.Request +} diff --git a/pkg/search/proxy/cache_proxy.go b/pkg/search/proxy/framework/plugins/cache/cache.go similarity index 59% rename from pkg/search/proxy/cache_proxy.go rename to pkg/search/proxy/framework/plugins/cache/cache.go index 92e02cd00f31..ac451ab6e410 100644 --- a/pkg/search/proxy/cache_proxy.go +++ b/pkg/search/proxy/framework/plugins/cache/cache.go @@ -1,4 +1,4 @@ -package proxy +package cache import ( "context" @@ -13,42 +13,66 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/handlers" - "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/kubernetes/scheme" + "github.com/karmada-io/karmada/pkg/search/proxy/framework" + pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime" "github.com/karmada-io/karmada/pkg/search/proxy/store" ) -// cacheProxy caches resource from member clusters, and handle the read request(get/list/watch) for resources. -type cacheProxy struct { - store store.RESTReader +const ( + // We keep a big gap between in-tree plugins, to allow users to insert custom plugins between them. + order = 1000 +) + +// Cache caches resource from member clusters, and handle the read request(get/list/watch) for resources. +// For reading requests, we redirect them to cache. +// Users call these requests to read resources in member clusters, such as pods and nodes. +type Cache struct { + store store.Store restMapper meta.RESTMapper minRequestTimeout time.Duration } -func newCacheProxy(store store.RESTReader, restMapper meta.RESTMapper, minRequestTimeout time.Duration) *cacheProxy { - return &cacheProxy{ - store: store, - restMapper: restMapper, - minRequestTimeout: minRequestTimeout, - } +var _ framework.Plugin = (*Cache)(nil) + +// New creates an instance of Cache +func New(dep pluginruntime.PluginDependency) (framework.Plugin, error) { + return &Cache{ + store: dep.Store, + restMapper: dep.RestMapper, + minRequestTimeout: dep.MinRequestTimeout, + }, nil } -func (c *cacheProxy) connect(ctx context.Context, _ schema.GroupVersionResource, _ string, _ rest.Responder) (http.Handler, error) { - requestInfo, _ := request.RequestInfoFrom(ctx) - gvr := schema.GroupVersionResource{ - Group: requestInfo.APIGroup, - Version: requestInfo.APIVersion, - Resource: requestInfo.Resource, - } +// Order implements Plugin +func (c *Cache) Order() int { + return order +} + +// SupportRequest implements Plugin +func (c *Cache) SupportRequest(request framework.ProxyRequest) bool { + requestInfo := request.RequestInfo + + return requestInfo.IsResourceRequest && + c.store.HasResource(request.GroupVersionResource) && + requestInfo.Subresource == "" && + (requestInfo.Verb == "get" || + requestInfo.Verb == "list" || + requestInfo.Verb == "watch") +} + +// Connect implements Plugin +func (c *Cache) Connect(ctx context.Context, request framework.ProxyRequest) (http.Handler, error) { + requestInfo := request.RequestInfo r := &rester{ store: c.store, - gvr: gvr, - tableConvertor: rest.NewDefaultTableConvertor(gvr.GroupResource()), + gvr: request.GroupVersionResource, + tableConvertor: rest.NewDefaultTableConvertor(request.GroupVersionResource.GroupResource()), } - gvk, err := c.restMapper.KindFor(gvr) + gvk, err := c.restMapper.KindFor(request.GroupVersionResource) if err != nil { return nil, err } @@ -59,7 +83,7 @@ func (c *cacheProxy) connect(ctx context.Context, _ schema.GroupVersionResource, scope := &handlers.RequestScope{ Kind: gvk, - Resource: gvr, + Resource: request.GroupVersionResource, Namer: &handlers.ContextBasedNaming{ Namer: meta.NewAccessor(), ClusterScoped: mapping.Scope.Name() == meta.RESTScopeNameRoot, @@ -81,7 +105,7 @@ func (c *cacheProxy) connect(ctx context.Context, _ schema.GroupVersionResource, } type rester struct { - store store.RESTReader + store store.Store gvr schema.GroupVersionResource tableConvertor rest.TableConvertor } diff --git a/pkg/search/proxy/cache_proxy_test.go b/pkg/search/proxy/framework/plugins/cache/cache_test.go similarity index 77% rename from pkg/search/proxy/cache_proxy_test.go rename to pkg/search/proxy/framework/plugins/cache/cache_test.go index 76509bb7686f..a54e104d3037 100644 --- a/pkg/search/proxy/cache_proxy_test.go +++ b/pkg/search/proxy/framework/plugins/cache/cache_test.go @@ -1,4 +1,4 @@ -package proxy +package cache import ( "context" @@ -10,7 +10,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - corev1 "k8s.io/api/core/v1" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -21,18 +20,11 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/request" - clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" - "github.com/karmada-io/karmada/pkg/search/proxy/store" + "github.com/karmada-io/karmada/pkg/search/proxy/framework" + proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing" "github.com/karmada-io/karmada/pkg/util/lifted" ) -var ( - podGVR = corev1.SchemeGroupVersion.WithResource("pods") - nodeGVR = corev1.SchemeGroupVersion.WithResource("nodes") - secretGVR = corev1.SchemeGroupVersion.WithResource("secret") - clusterGVR = clusterv1alpha1.SchemeGroupVersion.WithResource("cluster") -) - func TestCacheProxy_connect(t *testing.T) { type args struct { url string @@ -46,8 +38,8 @@ func TestCacheProxy_connect(t *testing.T) { } var actual want - p := &cacheProxy{ - store: &restReaderFuncs{ + p := &Cache{ + store: &proxytest.MockStore{ GetFunc: func(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) { actual = want{} actual.namespace = request.NamespaceValue(ctx) @@ -76,7 +68,7 @@ func TestCacheProxy_connect(t *testing.T) { return w, nil }, }, - restMapper: restMapper, + restMapper: proxytest.RestMapper, } tests := []struct { name string @@ -90,7 +82,7 @@ func TestCacheProxy_connect(t *testing.T) { }, want: want{ name: "foo", - gvr: nodeGVR, + gvr: proxytest.NodeGVR, getOptions: &metav1.GetOptions{}, }, }, @@ -102,7 +94,7 @@ func TestCacheProxy_connect(t *testing.T) { want: want{ namespace: "default", name: "foo", - gvr: podGVR, + gvr: proxytest.PodGVR, getOptions: &metav1.GetOptions{}, }, }, @@ -114,7 +106,7 @@ func TestCacheProxy_connect(t *testing.T) { want: want{ namespace: "default", name: "foo", - gvr: podGVR, + gvr: proxytest.PodGVR, getOptions: &metav1.GetOptions{ResourceVersion: "1000"}, }, }, @@ -124,7 +116,7 @@ func TestCacheProxy_connect(t *testing.T) { url: "/api/v1/nodes", }, want: want{ - gvr: nodeGVR, + gvr: proxytest.NodeGVR, listOptions: &metainternalversion.ListOptions{}, }, }, @@ -135,7 +127,7 @@ func TestCacheProxy_connect(t *testing.T) { }, want: want{ namespace: "default", - gvr: podGVR, + gvr: proxytest.PodGVR, listOptions: &metainternalversion.ListOptions{}, }, }, @@ -146,7 +138,7 @@ func TestCacheProxy_connect(t *testing.T) { }, want: want{ namespace: "default", - gvr: podGVR, + gvr: proxytest.PodGVR, listOptions: &metainternalversion.ListOptions{ LabelSelector: asLabelSelector("app=foo"), FieldSelector: fields.OneTermEqualSelector("metadata.name", "bar"), @@ -161,7 +153,7 @@ func TestCacheProxy_connect(t *testing.T) { url: "/api/v1/nodes?watch=true", }, want: want{ - gvr: nodeGVR, + gvr: proxytest.NodeGVR, listOptions: &metainternalversion.ListOptions{ LabelSelector: labels.NewSelector(), FieldSelector: fields.Everything(), @@ -176,7 +168,7 @@ func TestCacheProxy_connect(t *testing.T) { }, want: want{ namespace: "default", - gvr: podGVR, + gvr: proxytest.PodGVR, listOptions: &metainternalversion.ListOptions{ LabelSelector: labels.NewSelector(), FieldSelector: fields.Everything(), @@ -191,7 +183,7 @@ func TestCacheProxy_connect(t *testing.T) { }, want: want{ namespace: "default", - gvr: podGVR, + gvr: proxytest.PodGVR, listOptions: &metainternalversion.ListOptions{ LabelSelector: asLabelSelector("app=foo"), FieldSelector: fields.OneTermEqualSelector("metadata.name", "bar"), @@ -219,7 +211,19 @@ func TestCacheProxy_connect(t *testing.T) { req = req.WithContext(request.WithNamespace(req.Context(), requestInfo.Namespace)) } - h, err := p.connect(req.Context(), podGVR, "", nil) + gvr := schema.GroupVersionResource{ + Group: requestInfo.APIGroup, + Version: requestInfo.APIVersion, + Resource: requestInfo.Resource, + } + + h, err := p.Connect(req.Context(), framework.ProxyRequest{ + RequestInfo: requestInfo, + GroupVersionResource: gvr, + ProxyPath: "", + Responder: nil, + HTTPReq: req, + }) if err != nil { t.Error(err) return @@ -244,35 +248,6 @@ func TestCacheProxy_connect(t *testing.T) { } } -type restReaderFuncs struct { - GetFunc func(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) - ListFunc func(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error) - WatchFunc func(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) -} - -var _ store.RESTReader = &restReaderFuncs{} - -func (r *restReaderFuncs) Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) { - if r.GetFunc == nil { - panic("implement me") - } - return r.GetFunc(ctx, gvr, name, options) -} - -func (r *restReaderFuncs) List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error) { - if r.GetFunc == nil { - panic("implement me") - } - return r.ListFunc(ctx, gvr, options) -} - -func (r *restReaderFuncs) Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) { - if r.GetFunc == nil { - panic("implement me") - } - return r.WatchFunc(ctx, gvr, options) -} - type emptyResponseWriter struct{} var _ http.ResponseWriter = &emptyResponseWriter{} diff --git a/pkg/search/proxy/cluster_proxy.go b/pkg/search/proxy/framework/plugins/cluster/cluster.go similarity index 50% rename from pkg/search/proxy/cluster_proxy.go rename to pkg/search/proxy/framework/plugins/cluster/cluster.go index 61e2cf50a593..105feb2dbe0c 100644 --- a/pkg/search/proxy/cluster_proxy.go +++ b/pkg/search/proxy/framework/plugins/cluster/cluster.go @@ -1,58 +1,76 @@ -package proxy +package cluster import ( "bytes" "context" - "fmt" "io" "io/ioutil" "net/http" - "path" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/apiserver/pkg/registry/rest" listcorev1 "k8s.io/client-go/listers/core/v1" clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" + "github.com/karmada-io/karmada/pkg/search/proxy/framework" + pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime" "github.com/karmada-io/karmada/pkg/search/proxy/store" "github.com/karmada-io/karmada/pkg/util/proxy" ) -// clusterProxy proxy to member clusters -type clusterProxy struct { - store store.Cache +const ( + // We keep a big gap between in-tree plugins, to allow users to insert custom plugins between them. + order = 2000 +) + +// Cluster proxies the remaining requests to member clusters: +// - writing resources. +// - or subresource requests, e.g. `pods/log` +// We firstly find the resource from cache, and get the located cluster. Then redirect the request to the cluster. +type Cluster struct { + store store.Store clusterLister clusterlisters.ClusterLister secretLister listcorev1.SecretLister } -func newClusterProxy(store store.Cache, clusterLister clusterlisters.ClusterLister, secretLister listcorev1.SecretLister) *clusterProxy { - return &clusterProxy{ - store: store, +var _ framework.Plugin = (*Cluster)(nil) + +// New creates an instance of Cluster +func New(dep pluginruntime.PluginDependency) (framework.Plugin, error) { + secretLister := dep.KubeFactory.Core().V1().Secrets().Lister() + clusterLister := dep.KarmadaFactory.Cluster().V1alpha1().Clusters().Lister() + + return &Cluster{ + store: dep.Store, clusterLister: clusterLister, secretLister: secretLister, - } + }, nil } -func (c *clusterProxy) connect(ctx context.Context, gvr schema.GroupVersionResource, proxyPath string, responder rest.Responder) (http.Handler, error) { - requestInfo, ok := request.RequestInfoFrom(ctx) - if !ok { - return nil, fmt.Errorf("missing requestInfo") - } +// Order implements Plugin +func (c *Cluster) Order() int { + return order +} + +// SupportRequest implements Plugin +func (c *Cluster) SupportRequest(request framework.ProxyRequest) bool { + return request.RequestInfo.IsResourceRequest && c.store.HasResource(request.GroupVersionResource) +} + +// Connect implements Plugin +func (c *Cluster) Connect(ctx context.Context, request framework.ProxyRequest) (http.Handler, error) { + requestInfo := request.RequestInfo if requestInfo.Verb == "create" { - return nil, apierrors.NewMethodNotSupported(gvr.GroupResource(), requestInfo.Verb) + return nil, apierrors.NewMethodNotSupported(request.GroupVersionResource.GroupResource(), requestInfo.Verb) } - _, clusterName, err := c.store.GetResourceFromCache(ctx, gvr, requestInfo.Namespace, requestInfo.Name) + _, clusterName, err := c.store.GetResourceFromCache(ctx, request.GroupVersionResource, requestInfo.Namespace, requestInfo.Name) if err != nil { return nil, err } - h, err := c.connectCluster(ctx, clusterName, proxyPath, responder) + h, err := proxy.ConnectCluster(ctx, c.clusterLister, c.secretLister, clusterName, request.ProxyPath, request.Responder) if err != nil { return nil, err } @@ -65,33 +83,13 @@ func (c *clusterProxy) connect(ctx context.Context, gvr schema.GroupVersionResou // So before update, we shall recover these fields. return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { if err = modifyRequest(req, clusterName); err != nil { - responder.Error(err) + request.Responder.Error(err) return } h.ServeHTTP(rw, req) }), nil } -func (c *clusterProxy) connectCluster(ctx context.Context, clusterName string, proxyPath string, responder rest.Responder) (http.Handler, error) { - cluster, err := c.clusterLister.Get(clusterName) - if err != nil { - return nil, err - } - location, transport, err := proxy.Location(cluster.Name, cluster.Spec.APIEndpoint, cluster.Spec.ProxyURL) - if err != nil { - return nil, err - } - location.Path = path.Join(location.Path, proxyPath) - - secretGetter := func(context.Context, string) (*corev1.Secret, error) { - if cluster.Spec.ImpersonatorSecretRef == nil { - return nil, fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", cluster.Name) - } - return c.secretLister.Secrets(cluster.Spec.ImpersonatorSecretRef.Namespace).Get(cluster.Spec.ImpersonatorSecretRef.Name) - } - return proxy.ConnectCluster(ctx, cluster.Name, location, transport, responder, secretGetter) -} - func modifyRequest(req *http.Request, cluster string) error { if req.ContentLength == 0 { return nil diff --git a/pkg/search/proxy/cluster_proxy_test.go b/pkg/search/proxy/framework/plugins/cluster/cluster_test.go similarity index 86% rename from pkg/search/proxy/cluster_proxy_test.go rename to pkg/search/proxy/framework/plugins/cluster/cluster_test.go index 47a0e4541938..d6cea5a89403 100644 --- a/pkg/search/proxy/cluster_proxy_test.go +++ b/pkg/search/proxy/framework/plugins/cluster/cluster_test.go @@ -1,4 +1,4 @@ -package proxy +package cluster import ( "bytes" @@ -30,7 +30,9 @@ import ( clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" karmadafake "github.com/karmada-io/karmada/pkg/generated/clientset/versioned/fake" karmadainformers "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" + "github.com/karmada-io/karmada/pkg/search/proxy/framework" "github.com/karmada-io/karmada/pkg/search/proxy/store" + proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing" ) func TestModifyRequest(t *testing.T) { @@ -167,13 +169,13 @@ func Test_clusterProxy_connect(t *testing.T) { reqCtx := request.WithUser(context.TODO(), &user.DefaultInfo{}) type fields struct { - store store.Cache + store store.Store secrets []runtime.Object clusters []runtime.Object } type args struct { - ctx context.Context - request *http.Request + requestInfo *request.RequestInfo + request *http.Request } type want struct { err error @@ -185,37 +187,27 @@ func Test_clusterProxy_connect(t *testing.T) { args args want want }{ - { - name: "missing requestInfo", - fields: fields{}, - args: args{ - ctx: context.TODO(), - }, - want: want{ - err: errors.New("missing requestInfo"), - }, - }, { name: "create not supported", fields: fields{}, args: args{ - ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "create"}), + requestInfo: &request.RequestInfo{Verb: "create"}, }, want: want{ - err: apierrors.NewMethodNotSupported(podGVR.GroupResource(), "create"), + err: apierrors.NewMethodNotSupported(proxytest.PodGVR.GroupResource(), "create"), }, }, { name: "get cache error", fields: fields{ - store: &cacheFuncs{ + store: &proxytest.MockStore{ GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) { return nil, "", errors.New("test error") }, }, }, args: args{ - ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "get"}), + requestInfo: &request.RequestInfo{Verb: "get"}, }, want: want{ err: errors.New("test error"), @@ -224,23 +216,23 @@ func Test_clusterProxy_connect(t *testing.T) { { name: "cluster not found", fields: fields{ - store: &cacheFuncs{ + store: &proxytest.MockStore{ GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) { return nil, "cluster1", nil }, }, }, args: args{ - ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "get"}), + requestInfo: &request.RequestInfo{Verb: "get"}, }, want: want{ - err: apierrors.NewNotFound(clusterGVR.GroupResource(), "cluster1"), + err: apierrors.NewNotFound(proxytest.ClusterGVR.GroupResource(), "cluster1"), }, }, { name: "API endpoint of cluster cluster1 should not be empty", fields: fields{ - store: &cacheFuncs{ + store: &proxytest.MockStore{ GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) { return nil, "cluster1", nil }, @@ -251,7 +243,7 @@ func Test_clusterProxy_connect(t *testing.T) { }}, }, args: args{ - ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "get"}), + requestInfo: &request.RequestInfo{Verb: "get"}, }, want: want{ err: errors.New("API endpoint of cluster cluster1 should not be empty"), @@ -260,7 +252,7 @@ func Test_clusterProxy_connect(t *testing.T) { { name: "impersonatorSecretRef is nil", fields: fields{ - store: &cacheFuncs{ + store: &proxytest.MockStore{ GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) { return nil, "cluster1", nil }, @@ -273,7 +265,7 @@ func Test_clusterProxy_connect(t *testing.T) { }}, }, args: args{ - ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "get"}), + requestInfo: &request.RequestInfo{Verb: "get"}, }, want: want{ err: errors.New("the impersonatorSecretRef of cluster cluster1 is nil"), @@ -282,7 +274,7 @@ func Test_clusterProxy_connect(t *testing.T) { { name: "secret not found", fields: fields{ - store: &cacheFuncs{ + store: &proxytest.MockStore{ GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) { return nil, "cluster1", nil }, @@ -299,16 +291,16 @@ func Test_clusterProxy_connect(t *testing.T) { }}, }, args: args{ - ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "get"}), + requestInfo: &request.RequestInfo{Verb: "get"}, }, want: want{ - err: apierrors.NewNotFound(secretGVR.GroupResource(), "secret"), + err: apierrors.NewNotFound(proxytest.SecretGVR.GroupResource(), "secret"), }, }, { name: "response ok", fields: fields{ - store: &cacheFuncs{ + store: &proxytest.MockStore{ GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) { return nil, "cluster1", nil }, @@ -334,8 +326,8 @@ func Test_clusterProxy_connect(t *testing.T) { }}, }, args: args{ - ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "get"}), - request: makeRequest(reqCtx, "GET", "/test", nil), + requestInfo: &request.RequestInfo{Verb: "get"}, + request: makeRequest(reqCtx, "GET", "/test", nil), }, want: want{ err: nil, @@ -345,7 +337,7 @@ func Test_clusterProxy_connect(t *testing.T) { { name: "update error", fields: fields{ - store: &cacheFuncs{ + store: &proxytest.MockStore{ GetResourceFromCacheFunc: func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) { return nil, "cluster1", nil }, @@ -371,7 +363,7 @@ func Test_clusterProxy_connect(t *testing.T) { }}, }, args: args{ - ctx: request.WithRequestInfo(context.TODO(), &request.RequestInfo{Verb: "update"}), + requestInfo: &request.RequestInfo{Verb: "update"}, request: (&http.Request{ Method: "PUT", URL: &url.URL{Scheme: "https", Host: "localhost", Path: "/test"}, @@ -393,7 +385,7 @@ func Test_clusterProxy_connect(t *testing.T) { kubeFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.fields.secrets...), 0) karmadaFactory := karmadainformers.NewSharedInformerFactory(karmadafake.NewSimpleClientset(tt.fields.clusters...), 0) - c := &clusterProxy{ + c := &Cluster{ store: tt.fields.store, clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(), secretLister: kubeFactory.Core().V1().Secrets().Lister(), @@ -406,9 +398,15 @@ func Test_clusterProxy_connect(t *testing.T) { response := httptest.NewRecorder() - h, err := c.connect(tt.args.ctx, podGVR, "/proxy", newTestResponder(response)) - if !errorEquals(err, tt.want.err) { - t.Errorf("connect() error = %v, want %v", err, tt.want.err) + h, err := c.Connect(context.TODO(), framework.ProxyRequest{ + RequestInfo: tt.args.requestInfo, + GroupVersionResource: proxytest.PodGVR, + ProxyPath: "/proxy", + Responder: proxytest.NewResponder(response), + HTTPReq: tt.args.request, + }) + if !proxytest.ErrorMessageEquals(err, tt.want.err) { + t.Errorf("Connect() error = %v, want %v", err, tt.want.err) return } if err != nil { diff --git a/pkg/search/proxy/framework/plugins/karmada/karmada.go b/pkg/search/proxy/framework/plugins/karmada/karmada.go new file mode 100644 index 000000000000..0dbd9b077150 --- /dev/null +++ b/pkg/search/proxy/framework/plugins/karmada/karmada.go @@ -0,0 +1,80 @@ +package karmada + +import ( + "context" + "net/http" + "net/url" + "path" + + restclient "k8s.io/client-go/rest" + + "github.com/karmada-io/karmada/pkg/search/proxy/framework" + pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime" + "github.com/karmada-io/karmada/pkg/util/proxy" +) + +const ( + // We keep a big gap between in-tree plugins, to allow users to insert custom plugins between them. + order = 3000 +) + +// Karmada proxies requests to karmada control panel. +// For non-resource requests, or resources are not defined in ResourceRegistry, +// we redirect the requests to karmada apiserver. +// Usually the request are +// - api index, e.g.: `/api`, `/apis` +// - to workload created in karmada controller panel, such as deployments and services. +type Karmada struct { + proxyLocation *url.URL + proxyTransport http.RoundTripper +} + +var _ framework.Plugin = (*Karmada)(nil) + +// New creates an instance of Karmada +func New(dep pluginruntime.PluginDependency) (framework.Plugin, error) { + location, err := url.Parse(dep.RestConfig.Host) + if err != nil { + return nil, err + } + + transport, err := restclient.TransportFor(dep.RestConfig) + if err != nil { + return nil, err + } + + return &Karmada{ + proxyLocation: location, + proxyTransport: transport, + }, nil +} + +// Order implements Plugin +func (p *Karmada) Order() int { + return order +} + +// SupportRequest implements Plugin +func (p *Karmada) SupportRequest(request framework.ProxyRequest) bool { + // This plugin's order is the last one. It's actually a fallback plugin. + // So we return true here to indicate we always support the request. + return true +} + +// Connect implements Plugin +func (p *Karmada) Connect(_ context.Context, request framework.ProxyRequest) (http.Handler, error) { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + location, transport := p.resourceLocation() + location.Path = path.Join(location.Path, request.ProxyPath) + location.RawQuery = req.URL.RawQuery + + handler := proxy.NewThrottledUpgradeAwareProxyHandler( + location, transport, true, false, request.Responder) + handler.ServeHTTP(rw, req) + }), nil +} + +func (p *Karmada) resourceLocation() (*url.URL, http.RoundTripper) { + location := *p.proxyLocation + return &location, p.proxyTransport +} diff --git a/pkg/search/proxy/karmada_proxy_test.go b/pkg/search/proxy/framework/plugins/karmada/karmada_test.go similarity index 70% rename from pkg/search/proxy/karmada_proxy_test.go rename to pkg/search/proxy/framework/plugins/karmada/karmada_test.go index 734a16d2bd8d..1c60b0818913 100644 --- a/pkg/search/proxy/karmada_proxy_test.go +++ b/pkg/search/proxy/framework/plugins/karmada/karmada_test.go @@ -1,15 +1,17 @@ -package proxy +package karmada import ( "context" - "encoding/json" "net/http" "net/http/httptest" "testing" "time" - "k8s.io/apimachinery/pkg/runtime" restclient "k8s.io/client-go/rest" + + "github.com/karmada-io/karmada/pkg/search/proxy/framework" + pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime" + proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing" ) func Test_karmadaProxy(t *testing.T) { @@ -65,14 +67,17 @@ func Test_karmadaProxy(t *testing.T) { }, Timeout: time.Second * 1, } - p, err := newKarmadaProxy(restConfig) + p, err := New(pluginruntime.PluginDependency{RestConfig: restConfig}) if err != nil { t.Error(err) return } response := httptest.NewRecorder() - h, err := p.connect(context.TODO(), podGVR, tt.args.path, newTestResponder(response)) + h, err := p.Connect(context.TODO(), framework.ProxyRequest{ + ProxyPath: tt.args.path, + Responder: proxytest.NewResponder(response), + }) if err != nil { t.Error(err) return @@ -101,28 +106,3 @@ func Test_karmadaProxy(t *testing.T) { }) } } - -type testResponder struct { - resp *httptest.ResponseRecorder -} - -func newTestResponder(response *httptest.ResponseRecorder) *testResponder { - return &testResponder{ - resp: response, - } -} - -func (f *testResponder) Object(statusCode int, obj runtime.Object) { - f.resp.Code = statusCode - - if obj != nil { - err := json.NewEncoder(f.resp).Encode(obj) - if err != nil { - f.Error(err) - } - } -} - -func (f *testResponder) Error(err error) { - _, _ = f.resp.WriteString(err.Error()) -} diff --git a/pkg/search/proxy/framework/plugins/registry.go b/pkg/search/proxy/framework/plugins/registry.go new file mode 100644 index 000000000000..c5fab90416fd --- /dev/null +++ b/pkg/search/proxy/framework/plugins/registry.go @@ -0,0 +1,20 @@ +package plugins + +import ( + cacheplugin "github.com/karmada-io/karmada/pkg/search/proxy/framework/plugins/cache" + clusterplugin "github.com/karmada-io/karmada/pkg/search/proxy/framework/plugins/cluster" + karmadaplugin "github.com/karmada-io/karmada/pkg/search/proxy/framework/plugins/karmada" + pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime" +) + +// For detailed information of in tree plugins' execution order, please see: +// https://github.com/karmada-io/karmada/tree/master/docs/proposals/resource-aggregation-proxy#request-routing + +// NewInTreeRegistry builds the registry with all the in-tree plugins. +func NewInTreeRegistry() pluginruntime.Registry { + return pluginruntime.Registry{ + cacheplugin.New, + clusterplugin.New, + karmadaplugin.New, + } +} diff --git a/pkg/search/proxy/framework/runtime/framework.go b/pkg/search/proxy/framework/runtime/framework.go new file mode 100644 index 000000000000..4df8c201a20f --- /dev/null +++ b/pkg/search/proxy/framework/runtime/framework.go @@ -0,0 +1,51 @@ +package runtime + +import ( + "context" + "fmt" + "net/http" + "sort" + + "github.com/karmada-io/karmada/pkg/search/proxy/framework" +) + +// frameworkImpl select appropriate plugin to do `Connect()` +type frameworkImpl struct { + plugins []framework.Plugin +} + +// frameworkImpl is actually a Proxy +var _ framework.Proxy = (*frameworkImpl)(nil) + +// NewFramework create instance of framework.Proxy with determined order of Plugin. +func NewFramework(plugins []framework.Plugin) framework.Proxy { + sort.Slice(plugins, func(i, j int) bool { + return plugins[i].Order() < plugins[j].Order() + }) + + return &frameworkImpl{ + plugins: plugins, + } +} + +// Connect implements Proxy +func (c *frameworkImpl) Connect(ctx context.Context, request framework.ProxyRequest) (http.Handler, error) { + plugin, err := c.selectPlugin(request) + if err != nil { + return nil, err + } + + return plugin.Connect(ctx, request) +} + +// selectPlugin return an appropriate Plugin by query Plugin.SupportRequest in order. +func (c *frameworkImpl) selectPlugin(request framework.ProxyRequest) (framework.Plugin, error) { + for _, plugin := range c.plugins { + if plugin.SupportRequest(request) { + return plugin, nil + } + } + + return nil, fmt.Errorf("no plugin found for request: %v %v", + request.RequestInfo.Verb, request.RequestInfo.Path) +} diff --git a/pkg/search/proxy/framework/runtime/registry.go b/pkg/search/proxy/framework/runtime/registry.go new file mode 100644 index 000000000000..1aa949160126 --- /dev/null +++ b/pkg/search/proxy/framework/runtime/registry.go @@ -0,0 +1,44 @@ +package runtime + +import ( + "time" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/informers" + "k8s.io/client-go/rest" + + "github.com/karmada-io/karmada/pkg/generated/informers/externalversions" + "github.com/karmada-io/karmada/pkg/search/proxy/framework" + "github.com/karmada-io/karmada/pkg/search/proxy/store" +) + +// PluginDependency holds dependency for plugins. It will be passed to PluginFactory when initializing Plugin. +type PluginDependency struct { + RestConfig *rest.Config + RestMapper meta.RESTMapper + + KubeFactory informers.SharedInformerFactory + KarmadaFactory externalversions.SharedInformerFactory + + MinRequestTimeout time.Duration + + Store store.Store +} + +// PluginFactory is the function to create a plugin. +type PluginFactory func(dep PluginDependency) (framework.Plugin, error) + +// Registry is a collection of all available plugins. The framework uses a +// registry to enable and initialize configured plugins. +// All plugins must be in the registry before initializing the framework. +type Registry []PluginFactory + +// Register adds a new plugin to the registry. +func (r *Registry) Register(factory PluginFactory) { + *r = append(*r, factory) +} + +// Merge merges the provided registry to the current one. +func (r *Registry) Merge(in Registry) { + *r = append(*r, in...) +} diff --git a/pkg/search/proxy/framework/runtime/registry_test.go b/pkg/search/proxy/framework/runtime/registry_test.go new file mode 100644 index 000000000000..ff8e1ed5f267 --- /dev/null +++ b/pkg/search/proxy/framework/runtime/registry_test.go @@ -0,0 +1,28 @@ +package runtime + +import ( + "testing" + + "github.com/karmada-io/karmada/pkg/search/proxy/framework" +) + +func emptyPluginFactory(dep PluginDependency) (framework.Plugin, error) { + return nil, nil +} + +func TestRegistry_Register(t *testing.T) { + // test nil slice + var nilSlice Registry + + t.Logf("nilSlice: %v, len: %v, cap: %v\n", nilSlice, len(nilSlice), cap(nilSlice)) + + nilSlice.Register(emptyPluginFactory) + + // no panic + + t.Logf("nilSlice: %v, len: %v, cap: %v\n", nilSlice, len(nilSlice), cap(nilSlice)) + + if len(nilSlice) != 1 { + t.Fatalf("slice len = %v, expected = 1", len(nilSlice)) + } +} diff --git a/pkg/search/proxy/karmada_proxy.go b/pkg/search/proxy/karmada_proxy.go deleted file mode 100644 index fa33b4a62553..000000000000 --- a/pkg/search/proxy/karmada_proxy.go +++ /dev/null @@ -1,53 +0,0 @@ -package proxy - -import ( - "context" - "net/http" - "net/url" - "path" - - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apiserver/pkg/registry/rest" - restclient "k8s.io/client-go/rest" - - "github.com/karmada-io/karmada/pkg/util/proxy" -) - -// karmadaProxy is proxy for karmada control panel -type karmadaProxy struct { - proxyLocation *url.URL - proxyTransport http.RoundTripper -} - -func newKarmadaProxy(restConfig *restclient.Config) (*karmadaProxy, error) { - location, err := url.Parse(restConfig.Host) - if err != nil { - return nil, err - } - transport, err := restclient.TransportFor(restConfig) - if err != nil { - return nil, err - } - - return &karmadaProxy{ - proxyLocation: location, - proxyTransport: transport, - }, nil -} - -// connect to Karmada-ApiServer directly -func (p *karmadaProxy) connect(_ context.Context, _ schema.GroupVersionResource, proxyPath string, responder rest.Responder) (http.Handler, error) { - return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - location, transport := p.resourceLocation() - location.Path = path.Join(location.Path, proxyPath) - location.RawQuery = req.URL.RawQuery - - handler := proxy.NewThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder) - handler.ServeHTTP(rw, req) - }), nil -} - -func (p *karmadaProxy) resourceLocation() (*url.URL, http.RoundTripper) { - location := *p.proxyLocation - return &location, p.proxyTransport -} diff --git a/pkg/search/proxy/store/multi_cluster_cache.go b/pkg/search/proxy/store/multi_cluster_cache.go index 29eb2b05cb80..e140ca809716 100644 --- a/pkg/search/proxy/store/multi_cluster_cache.go +++ b/pkg/search/proxy/store/multi_cluster_cache.go @@ -21,16 +21,13 @@ import ( "k8s.io/klog/v2" ) -// Cache an interface for cache. -type Cache interface { +// Store is the cache for resources from multiple member clusters +type Store interface { UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]struct{}) error HasResource(resource schema.GroupVersionResource) bool GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) Stop() -} -// RESTReader supports get/list/watch rest. -type RESTReader interface { Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error) Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) @@ -46,8 +43,7 @@ type MultiClusterCache struct { newClientFunc func(string) (dynamic.Interface, error) } -var _ Cache = &MultiClusterCache{} -var _ RESTReader = &MultiClusterCache{} +var _ Store = &MultiClusterCache{} // NewMultiClusterCache return a cache for resources from member clusters func NewMultiClusterCache(newClientFunc func(string) (dynamic.Interface, error), restMapper meta.RESTMapper) *MultiClusterCache { diff --git a/pkg/search/proxy/testing/constant.go b/pkg/search/proxy/testing/constant.go new file mode 100644 index 000000000000..67c23e604ced --- /dev/null +++ b/pkg/search/proxy/testing/constant.go @@ -0,0 +1,32 @@ +package testing + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1" +) + +// variables for test +var ( + PodGVK = corev1.SchemeGroupVersion.WithKind("Pod") + NodeGVK = corev1.SchemeGroupVersion.WithKind("Node") + + PodGVR = corev1.SchemeGroupVersion.WithResource("pods") + NodeGVR = corev1.SchemeGroupVersion.WithResource("nodes") + SecretGVR = corev1.SchemeGroupVersion.WithResource("secret") + ClusterGVR = clusterv1alpha1.SchemeGroupVersion.WithResource("cluster") + + PodSelector = searchv1alpha1.ResourceSelector{APIVersion: PodGVK.GroupVersion().String(), Kind: PodGVK.Kind} + NodeSelector = searchv1alpha1.ResourceSelector{APIVersion: NodeGVK.GroupVersion().String(), Kind: NodeGVK.Kind} + + RestMapper *meta.DefaultRESTMapper +) + +func init() { + RestMapper = meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion}) + RestMapper.Add(PodGVK, meta.RESTScopeNamespace) + RestMapper.Add(NodeGVK, meta.RESTScopeRoot) +} diff --git a/pkg/search/proxy/testing/function.go b/pkg/search/proxy/testing/function.go new file mode 100644 index 000000000000..b350545100da --- /dev/null +++ b/pkg/search/proxy/testing/function.go @@ -0,0 +1,17 @@ +package testing + +// ErrorMessageEquals compare if two error message is equal. +// 1. nil error == nil error +// 2. nil error != non nil error +// 3. Other wise compare error.Error() returned string +func ErrorMessageEquals(a, b error) bool { + if a == nil && b == nil { + return true + } + + if a == nil || b == nil { + return false + } + + return a.Error() == b.Error() +} diff --git a/pkg/search/proxy/testing/mock_responder.go b/pkg/search/proxy/testing/mock_responder.go new file mode 100644 index 000000000000..e24bac2e016e --- /dev/null +++ b/pkg/search/proxy/testing/mock_responder.go @@ -0,0 +1,37 @@ +package testing + +import ( + "encoding/json" + "net/http/httptest" + + "k8s.io/apimachinery/pkg/runtime" +) + +// MockResponder is a mock for `k8s.io/apiserver/pkg/registry/rest/rest.go:292 => Responder interface` +type MockResponder struct { + resp *httptest.ResponseRecorder +} + +// NewResponder creates an instance of MockResponder +func NewResponder(response *httptest.ResponseRecorder) *MockResponder { + return &MockResponder{ + resp: response, + } +} + +// Object implements Responder interface +func (f *MockResponder) Object(statusCode int, obj runtime.Object) { + f.resp.Code = statusCode + + if obj != nil { + err := json.NewEncoder(f.resp).Encode(obj) + if err != nil { + f.Error(err) + } + } +} + +// Error implements Responder interface +func (f *MockResponder) Error(err error) { + _, _ = f.resp.WriteString(err.Error()) +} diff --git a/pkg/search/proxy/testing/mock_store.go b/pkg/search/proxy/testing/mock_store.go new file mode 100644 index 000000000000..7cf14c43d920 --- /dev/null +++ b/pkg/search/proxy/testing/mock_store.go @@ -0,0 +1,84 @@ +package testing + +import ( + "context" + + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + + "github.com/karmada-io/karmada/pkg/search/proxy/store" +) + +// MockStore is a mock for store.Store interface +type MockStore struct { + UpdateCacheFunc func(resourcesByCluster map[string]map[schema.GroupVersionResource]struct{}) error + HasResourceFunc func(resource schema.GroupVersionResource) bool + GetResourceFromCacheFunc func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) + StopFunc func() + GetFunc func(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) + ListFunc func(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error) + WatchFunc func(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) +} + +var _ store.Store = &MockStore{} + +// UpdateCache implements store.Store interface +func (c *MockStore) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]struct{}) error { + if c.UpdateCacheFunc == nil { + panic("implement me") + } + return c.UpdateCacheFunc(resourcesByCluster) +} + +// HasResource implements store.Store interface +func (c *MockStore) HasResource(resource schema.GroupVersionResource) bool { + if c.HasResourceFunc == nil { + panic("implement me") + } + return c.HasResourceFunc(resource) +} + +// GetResourceFromCache implements store.Store interface +func (c *MockStore) GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) { + if c.GetResourceFromCacheFunc == nil { + panic("implement me") + } + return c.GetResourceFromCacheFunc(ctx, gvr, namespace, name) +} + +// Stop implements store.Store interface +func (c *MockStore) Stop() { + if c.StopFunc != nil { + c.StopFunc() + } +} + +// Get implements store.Store interface +func (c *MockStore) Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error) { + if c.GetFunc == nil { + panic("implement me") + } + + return c.GetFunc(ctx, gvr, name, options) +} + +// List implements store.Store interface +func (c *MockStore) List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error) { + if c.ListFunc == nil { + panic("implement me") + } + + return c.ListFunc(ctx, gvr, options) +} + +// Watch implements store.Store interface +func (c *MockStore) Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error) { + if c.WatchFunc == nil { + panic("implement me") + } + + return c.WatchFunc(ctx, gvr, options) +} diff --git a/pkg/util/proxy/proxy.go b/pkg/util/proxy/proxy.go index 50d90a0507af..10094e2a3ad3 100644 --- a/pkg/util/proxy/proxy.go +++ b/pkg/util/proxy/proxy.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "net/url" + "path" authenticationv1 "k8s.io/api/authentication/v1" corev1 "k8s.io/api/core/v1" @@ -16,18 +17,38 @@ import ( "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" + listcorev1 "k8s.io/client-go/listers/core/v1" clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" + clusterlisters "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1" ) -// NewThrottledUpgradeAwareProxyHandler creates a new proxy handler with a default flush interval. Responder is required for returning -// errors to the caller. -func NewThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler { - return proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder)) -} +// todo: consider share logic with pkg/registry/cluster/storage/proxy.go:53 // ConnectCluster returns a handler for proxy cluster. -func ConnectCluster(ctx context.Context, clusterName string, location *url.URL, transport http.RoundTripper, responder rest.Responder, +func ConnectCluster(ctx context.Context, + clusterLister clusterlisters.ClusterLister, secretLister listcorev1.SecretLister, + clusterName string, proxyPath string, responder rest.Responder) (http.Handler, error) { + cluster, err := clusterLister.Get(clusterName) + if err != nil { + return nil, err + } + location, transport, err := Location(cluster.Name, cluster.Spec.APIEndpoint, cluster.Spec.ProxyURL) + if err != nil { + return nil, err + } + location.Path = path.Join(location.Path, proxyPath) + + secretGetter := func(context.Context, string) (*corev1.Secret, error) { + if cluster.Spec.ImpersonatorSecretRef == nil { + return nil, fmt.Errorf("the impersonatorSecretRef of cluster %s is nil", cluster.Name) + } + return secretLister.Secrets(cluster.Spec.ImpersonatorSecretRef.Namespace).Get(cluster.Spec.ImpersonatorSecretRef.Name) + } + return connectCluster(ctx, cluster.Name, location, transport, responder, secretGetter) +} + +func connectCluster(ctx context.Context, clusterName string, location *url.URL, transport http.RoundTripper, responder rest.Responder, impersonateSecretGetter func(context.Context, string) (*corev1.Secret, error)) (http.Handler, error) { secret, err := impersonateSecretGetter(ctx, clusterName) if err != nil { @@ -42,6 +63,12 @@ func ConnectCluster(ctx context.Context, clusterName string, location *url.URL, return newProxyHandler(location, transport, impersonateToken, responder) } +// NewThrottledUpgradeAwareProxyHandler creates a new proxy handler with a default flush interval. Responder is required for returning +// errors to the caller. +func NewThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler { + return proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder)) +} + // Location returns a URL to which one can send traffic for the specified cluster. func Location(clusterName string, apiEndpoint string, proxyURL string) (*url.URL, http.RoundTripper, error) { location, err := constructLocation(clusterName, apiEndpoint)