Skip to content

Commit

Permalink
Add proxy framework.
Browse files Browse the repository at this point in the history
Signed-off-by: raymondmiaochaoyue <[email protected]>
  • Loading branch information
cmicat committed Oct 31, 2022
1 parent bc01953 commit 7e24783
Show file tree
Hide file tree
Showing 24 changed files with 1,401 additions and 954 deletions.
172 changes: 170 additions & 2 deletions cmd/karmada-search/app/karmada-search.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,46 @@ package app

import (
"context"
"fmt"
"net"
"net/http"
"path"
"time"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/endpoints/request"
genericapiserver "k8s.io/apiserver/pkg/server"
genericfilters "k8s.io/apiserver/pkg/server/filters"
genericoptions "k8s.io/apiserver/pkg/server/options"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

"github.com/karmada-io/karmada/cmd/karmada-search/app/options"
searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
generatedopenapi "github.com/karmada-io/karmada/pkg/generated/openapi"
"github.com/karmada-io/karmada/pkg/search"
"github.com/karmada-io/karmada/pkg/search/proxy"
"github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
"github.com/karmada-io/karmada/pkg/sharedcli"
"github.com/karmada-io/karmada/pkg/sharedcli/klogflag"
"github.com/karmada-io/karmada/pkg/sharedcli/profileflag"
"github.com/karmada-io/karmada/pkg/util/lifted"
"github.com/karmada-io/karmada/pkg/version"
"github.com/karmada-io/karmada/pkg/version/sharedcommand"
)

// Option configures a framework.Registry.
type Option func(*runtime.Registry)

// NewKarmadaSearchCommand creates a *cobra.Command object with default parameters
func NewKarmadaSearchCommand(ctx context.Context) *cobra.Command {
func NewKarmadaSearchCommand(ctx context.Context, registryOptions ...Option) *cobra.Command {
opts := options.NewOptions()

cmd := &cobra.Command{
Expand All @@ -28,7 +55,7 @@ capabilities such as global search and resource proxy in a multi-cloud environme
if err := opts.Validate(); err != nil {
return err
}
if err := opts.Run(ctx); err != nil {
if err := run(ctx, opts, registryOptions...); err != nil {
return err
}
return nil
Expand All @@ -52,3 +79,144 @@ capabilities such as global search and resource proxy in a multi-cloud environme
sharedcli.SetUsageAndHelpFunc(cmd, fss, cols)
return cmd
}

// WithPlugin creates an Option based on plugin factory.
// Please don't remove this function: it is used to register out-of-tree plugins,
// hence there are no references to it from the karmada-search code base.
func WithPlugin(factory runtime.PluginFactory) Option {
return func(registry *runtime.Registry) {
registry.Register(factory)
}
}

// `run` runs the karmada-search with options. This should never exit.
func run(ctx context.Context, o *options.Options, registryOptions ...Option) error {
klog.Infof("karmada-search version: %s", version.Get())

profileflag.ListenAndServe(o.ProfileOpts)

config, err := config(o, registryOptions...)
if err != nil {
return err
}

server, err := config.Complete().New()
if err != nil {
return err
}

server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-informers", func(context genericapiserver.PostStartHookContext) error {
config.GenericConfig.SharedInformerFactory.Start(context.StopCh)
return nil
})

server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-informers", func(context genericapiserver.PostStartHookContext) error {
config.ExtraConfig.KarmadaSharedInformerFactory.Start(context.StopCh)
return nil
})

if config.ExtraConfig.Controller != nil {
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error {
// start ResourceRegistry controller
config.ExtraConfig.Controller.Start(context.StopCh)
return nil
})
}

if config.ExtraConfig.ProxyController != nil {
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-proxy-controller", func(context genericapiserver.PostStartHookContext) error {
config.ExtraConfig.ProxyController.Start(context.StopCh)
return nil
})

server.GenericAPIServer.AddPreShutdownHookOrDie("stop-karmada-proxy-controller", func() error {
config.ExtraConfig.ProxyController.Stop()
return nil
})
}

return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
}

// `config` returns config for the api server given Options
func config(o *options.Options, outOfTreeRegistryOptions ...Option) (*search.Config, error) {
// TODO have a "real" external address
if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{netutils.ParseIPSloppy("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}

o.RecommendedOptions.Features = &genericoptions.FeatureOptions{EnableProfiling: false}

serverConfig := genericapiserver.NewRecommendedConfig(searchscheme.Codecs)
serverConfig.LongRunningFunc = customLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"))
serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(searchscheme.Scheme))
serverConfig.OpenAPIConfig.Info.Title = "karmada-search"
if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil {
return nil, err
}

serverConfig.ClientConfig.QPS = o.KubeAPIQPS
serverConfig.ClientConfig.Burst = o.KubeAPIBurst

restMapper, err := apiutil.NewDynamicRESTMapper(serverConfig.ClientConfig)
if err != nil {
klog.Errorf("Failed to create REST mapper: %v", err)
return nil, err
}

karmadaClient := karmadaclientset.NewForConfigOrDie(serverConfig.ClientConfig)
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)

var ctl *search.Controller
if !o.DisableSearch {
ctl, err = search.NewController(serverConfig.ClientConfig, factory, restMapper)
if err != nil {
return nil, err
}
}

var proxyCtl *proxy.Controller
if !o.DisableProxy {
outOfTreeRegistry := make(runtime.Registry, 0, len(outOfTreeRegistryOptions))
for _, option := range outOfTreeRegistryOptions {
option(&outOfTreeRegistry)
}

proxyCtl, err = proxy.NewController(proxy.NewControllerOption{
RestConfig: serverConfig.ClientConfig,
RestMapper: restMapper,
KubeFactory: serverConfig.SharedInformerFactory,
KarmadaFactory: factory,
MinRequestTimeout: time.Second * time.Duration(serverConfig.Config.MinRequestTimeout),
OutOfTreeRegistry: outOfTreeRegistry,
})

if err != nil {
return nil, err
}
}

config := &search.Config{
GenericConfig: serverConfig,
ExtraConfig: search.ExtraConfig{
KarmadaSharedInformerFactory: factory,
Controller: ctl,
ProxyController: proxyCtl,
},
}
return config, nil
}

func customLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) request.LongRunningRequestCheck {
return func(r *http.Request, requestInfo *request.RequestInfo) bool {
if requestInfo.APIGroup == "search.karmada.io" && requestInfo.Resource == "proxying" {
reqClone := r.Clone(context.TODO())
// requestInfo.Parts is like [proxying foo proxy api v1 nodes]
reqClone.URL.Path = "/" + path.Join(requestInfo.Parts[3:]...)
requestInfo = lifted.NewRequestInfo(reqClone)
}
return genericfilters.BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources)(r, requestInfo)
}
}
144 changes: 1 addition & 143 deletions cmd/karmada-search/app/options/options.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,21 @@
package options

import (
"context"
"fmt"
"net"
"net/http"
"path"
"time"

"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
genericfilters "k8s.io/apiserver/pkg/server/filters"
genericoptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme"
searchv1alpha1 "github.com/karmada-io/karmada/pkg/apis/search/v1alpha1"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
generatedopenapi "github.com/karmada-io/karmada/pkg/generated/openapi"
"github.com/karmada-io/karmada/pkg/search"
"github.com/karmada-io/karmada/pkg/search/proxy"
"github.com/karmada-io/karmada/pkg/sharedcli/profileflag"
"github.com/karmada-io/karmada/pkg/util/lifted"
"github.com/karmada-io/karmada/pkg/version"
)

const defaultEtcdPathPrefix = "/registry"

// Options contains everything necessary to create and run karmada-search.
// Options contains command line parameters for karmada-search.
type Options struct {
RecommendedOptions *genericoptions.RecommendedOptions

Expand Down Expand Up @@ -83,123 +61,3 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
func (o *Options) Complete() error {
return nil
}

// Run runs the aggregated-apiserver with options. This should never exit.
func (o *Options) Run(ctx context.Context) error {
klog.Infof("karmada-search version: %s", version.Get())

profileflag.ListenAndServe(o.ProfileOpts)

config, err := o.Config()
if err != nil {
return err
}

server, err := config.Complete().New()
if err != nil {
return err
}

server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-informers", func(context genericapiserver.PostStartHookContext) error {
config.GenericConfig.SharedInformerFactory.Start(context.StopCh)
return nil
})

server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-informers", func(context genericapiserver.PostStartHookContext) error {
config.ExtraConfig.KarmadaSharedInformerFactory.Start(context.StopCh)
return nil
})

if config.ExtraConfig.Controller != nil {
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error {
// start ResourceRegistry controller
config.ExtraConfig.Controller.Start(context.StopCh)
return nil
})
}

if config.ExtraConfig.ProxyController != nil {
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-proxy-controller", func(context genericapiserver.PostStartHookContext) error {
config.ExtraConfig.ProxyController.Start(context.StopCh)
return nil
})

server.GenericAPIServer.AddPreShutdownHookOrDie("stop-karmada-proxy-controller", func() error {
config.ExtraConfig.ProxyController.Stop()
return nil
})
}

return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
}

// Config returns config for the api server given Options
func (o *Options) Config() (*search.Config, error) {
// TODO have a "real" external address
if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{netutils.ParseIPSloppy("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}

o.RecommendedOptions.Features = &genericoptions.FeatureOptions{EnableProfiling: false}

serverConfig := genericapiserver.NewRecommendedConfig(searchscheme.Codecs)
serverConfig.LongRunningFunc = customLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"))
serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(searchscheme.Scheme))
serverConfig.OpenAPIConfig.Info.Title = "karmada-search"
if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil {
return nil, err
}

serverConfig.ClientConfig.QPS = o.KubeAPIQPS
serverConfig.ClientConfig.Burst = o.KubeAPIBurst

restMapper, err := apiutil.NewDynamicRESTMapper(serverConfig.ClientConfig)
if err != nil {
klog.Errorf("Failed to create REST mapper: %v", err)
return nil, err
}

karmadaClient := karmadaclientset.NewForConfigOrDie(serverConfig.ClientConfig)
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)

var ctl *search.Controller
if !o.DisableSearch {
ctl, err = search.NewController(serverConfig.ClientConfig, factory, restMapper)
if err != nil {
return nil, err
}
}

var proxyCtl *proxy.Controller
if !o.DisableProxy {
proxyCtl, err = proxy.NewController(serverConfig.ClientConfig, restMapper, serverConfig.SharedInformerFactory, factory,
time.Second*time.Duration(serverConfig.Config.MinRequestTimeout))
if err != nil {
return nil, err
}
}

config := &search.Config{
GenericConfig: serverConfig,
ExtraConfig: search.ExtraConfig{
KarmadaSharedInformerFactory: factory,
Controller: ctl,
ProxyController: proxyCtl,
},
}
return config, nil
}

func customLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) request.LongRunningRequestCheck {
return func(r *http.Request, requestInfo *request.RequestInfo) bool {
if requestInfo.APIGroup == "search.karmada.io" && requestInfo.Resource == "proxying" {
reqClone := r.Clone(context.TODO())
// requestInfo.Parts is like [proxying foo proxy api v1 nodes]
reqClone.URL.Path = "/" + path.Join(requestInfo.Parts[3:]...)
requestInfo = lifted.NewRequestInfo(reqClone)
}
return genericfilters.BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources)(r, requestInfo)
}
}
Loading

0 comments on commit 7e24783

Please sign in to comment.