Skip to content

Commit

Permalink
Merge pull request #5846 from XiShanYongYe-Chang/adjust-with-readines…
Browse files Browse the repository at this point in the history
…sCheck

Wait for search multiClusterCache ReadinessCheck success
  • Loading branch information
karmada-bot authored Nov 22, 2024
2 parents 9416c08 + 9367ca9 commit 079d0ab
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 30 deletions.
15 changes: 9 additions & 6 deletions cmd/karmada-search/app/karmada-search.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func run(ctx context.Context, o *options.Options, registryOptions ...Option) err
return nil
})

server.GenericAPIServer.AddPostStartHookOrDie("search-storage-cache-readiness", config.ExtraConfig.ProxyController.Hook)

if config.ExtraConfig.Controller != nil {
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error {
// start ResourceRegistry controller
Expand Down Expand Up @@ -210,12 +212,13 @@ func config(o *options.Options, outOfTreeRegistryOptions ...Option) (*search.Con
}

proxyCtl, err = proxy.NewController(proxy.NewControllerOption{
RestConfig: serverConfig.ClientConfig,
RestMapper: restMapper,
KubeFactory: serverConfig.SharedInformerFactory,
KarmadaFactory: factory,
MinRequestTimeout: time.Second * time.Duration(serverConfig.Config.MinRequestTimeout),
OutOfTreeRegistry: outOfTreeRegistry,
RestConfig: serverConfig.ClientConfig,
RestMapper: restMapper,
KubeFactory: serverConfig.SharedInformerFactory,
KarmadaFactory: factory,
MinRequestTimeout: time.Second * time.Duration(serverConfig.Config.MinRequestTimeout),
StorageInitializationTimeout: serverConfig.StorageInitializationTimeout,
OutOfTreeRegistry: outOfTreeRegistry,
})

if err != nil {
Expand Down
38 changes: 21 additions & 17 deletions cmd/karmada-search/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"

searchscheme "github.com/karmada-io/karmada/pkg/apis/search/scheme"
Expand All @@ -35,13 +34,14 @@ const defaultEtcdPathPrefix = "/registry"

// Options contains command line parameters for karmada-search.
type Options struct {
Etcd *genericoptions.EtcdOptions
SecureServing *genericoptions.SecureServingOptionsWithLoopback
Authentication *genericoptions.DelegatingAuthenticationOptions
Authorization *genericoptions.DelegatingAuthorizationOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
CoreAPI *genericoptions.CoreAPIOptions
Etcd *genericoptions.EtcdOptions
SecureServing *genericoptions.SecureServingOptionsWithLoopback
Authentication *genericoptions.DelegatingAuthenticationOptions
Authorization *genericoptions.DelegatingAuthorizationOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
CoreAPI *genericoptions.CoreAPIOptions
ServerRunOptions *genericoptions.ServerRunOptions

// KubeAPIQPS is the QPS to use while talking with karmada-search.
KubeAPIQPS float32
Expand All @@ -57,13 +57,14 @@ type Options struct {
// NewOptions returns a new Options.
func NewOptions() *Options {
o := &Options{
Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(defaultEtcdPathPrefix, searchscheme.Codecs.LegacyCodec(schema.GroupVersion{Group: searchv1alpha1.GroupVersion.Group, Version: searchv1alpha1.GroupVersion.Version}))),
SecureServing: genericoptions.NewSecureServingOptions().WithLoopback(),
Authentication: genericoptions.NewDelegatingAuthenticationOptions(),
Authorization: genericoptions.NewDelegatingAuthorizationOptions(),
Audit: genericoptions.NewAuditOptions(),
Features: genericoptions.NewFeatureOptions(),
CoreAPI: genericoptions.NewCoreAPIOptions(),
Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(defaultEtcdPathPrefix, searchscheme.Codecs.LegacyCodec(schema.GroupVersion{Group: searchv1alpha1.GroupVersion.Group, Version: searchv1alpha1.GroupVersion.Version}))),
SecureServing: genericoptions.NewSecureServingOptions().WithLoopback(),
Authentication: genericoptions.NewDelegatingAuthenticationOptions(),
Authorization: genericoptions.NewDelegatingAuthorizationOptions(),
Audit: genericoptions.NewAuditOptions(),
Features: genericoptions.NewFeatureOptions(),
CoreAPI: genericoptions.NewCoreAPIOptions(),
ServerRunOptions: genericoptions.NewServerRunOptions(),
}
o.Etcd.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(schema.GroupVersion{Group: searchv1alpha1.GroupVersion.Group, Version: searchv1alpha1.GroupVersion.Version},
schema.GroupKind{Group: searchv1alpha1.GroupName})
Expand All @@ -79,6 +80,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
o.Audit.AddFlags(flags)
o.Features.AddFlags(flags)
o.CoreAPI.AddFlags(flags)
o.ServerRunOptions.AddUniversalFlags(flags)

flags.Lookup("kubeconfig").Usage = "Path to karmada control plane kubeconfig file."

Expand All @@ -87,7 +89,6 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.BoolVar(&o.DisableSearch, "disable-search", false, "Disable search feature that would save memory usage significantly.")
flags.BoolVar(&o.DisableProxy, "disable-proxy", false, "Disable proxy feature that would save memory usage significantly.")

utilfeature.DefaultMutableFeatureGate.AddFlag(flags)
o.ProfileOpts.AddFlags(flags)
}

Expand All @@ -111,6 +112,9 @@ func (o *Options) ApplyTo(config *genericapiserver.RecommendedConfig) error {
if err := o.CoreAPI.ApplyTo(config); err != nil {
return err
}
if err := o.ServerRunOptions.ApplyTo(&config.Config); err != nil {
return err
}
kubeClient, err := kubernetes.NewForConfig(config.ClientConfig)
if err != nil {
return err
Expand All @@ -123,5 +127,5 @@ func (o *Options) ApplyTo(config *genericapiserver.RecommendedConfig) error {

// Complete fills in fields required to have valid data.
func (o *Options) Complete() error {
return nil
return o.ServerRunOptions.Complete()
}
1 change: 1 addition & 0 deletions cmd/karmada-search/app/options/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ func (o *Options) Validate() error {
errs = append(errs, o.Audit.Validate()...)
errs = append(errs, o.Features.Validate()...)
errs = append(errs, o.CoreAPI.Validate()...)
errs = append(errs, o.ServerRunOptions.Validate()...)
return utilerrors.NewAggregate(errs)
}
49 changes: 42 additions & 7 deletions pkg/search/proxy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package proxy

import (
"context"
"errors"
"net/http"
"time"

Expand All @@ -26,10 +27,12 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -66,6 +69,8 @@ type Controller struct {
store store.Store

proxy framework.Proxy

storageInitializationTimeout time.Duration
}

// NewControllerOption is the Option for NewController().
Expand All @@ -77,6 +82,9 @@ type NewControllerOption struct {
KarmadaFactory informerfactory.SharedInformerFactory

MinRequestTimeout time.Duration
// StorageInitializationTimeout defines the maximum amount of time to wait for storage initialization
// before declaring apiserver ready.
StorageInitializationTimeout time.Duration

OutOfTreeRegistry pluginruntime.Registry
}
Expand All @@ -97,13 +105,14 @@ func NewController(option NewControllerOption) (*Controller, error) {
proxy := pluginruntime.NewFramework(allPlugins)

ctl := &Controller{
restMapper: option.RestMapper,
negotiatedSerializer: scheme.Codecs.WithoutConversion(),
secretLister: secretLister,
clusterLister: clusterLister,
registryLister: option.KarmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(),
store: multiClusterStore,
proxy: proxy,
restMapper: option.RestMapper,
negotiatedSerializer: scheme.Codecs.WithoutConversion(),
secretLister: secretLister,
clusterLister: clusterLister,
registryLister: option.KarmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(),
store: multiClusterStore,
storageInitializationTimeout: option.StorageInitializationTimeout,
proxy: proxy,
}

workerOptions := util.Options{
Expand Down Expand Up @@ -312,3 +321,29 @@ func dynamicClientForClusterFunc(clusterLister clusterlisters.ClusterLister,
return dynamic.NewForConfig(clusterConfig)
}
}

func (ctl *Controller) storageReadinessCheck() bool {
return ctl.store.ReadinessCheck() == nil
}

// Hook waits for the controller to be in a storage ready state.
// Here, even if the initialization is not completed within the timeout interval,
// nil is still returned because the cache is per-type and per-cluster layer,
// and we want to avoid making the whole component as not ready, if the request for
// one of the resource types requires reinitialization, requests for all other
// resource types can still be handled properly.
func (ctl *Controller) Hook(ctx genericapiserver.PostStartHookContext) error {
deadlineCtx, cancel := context.WithTimeout(ctx, ctl.storageInitializationTimeout)
defer cancel()
err := wait.PollUntilContextCancel(deadlineCtx, 100*time.Millisecond, true,
func(_ context.Context) (bool, error) {
if ok := ctl.storageReadinessCheck(); ok {
return true, nil
}
return false, nil
})
if errors.Is(err, context.DeadlineExceeded) {
klog.Warningf("Deadline exceeded while waiting for storage readiness... ignoring")
}
return nil
}
20 changes: 20 additions & 0 deletions pkg/search/proxy/store/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package store

import (
"fmt"
"sync"

"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -118,3 +119,22 @@ func (c *clusterCache) cacheForResource(gvr schema.GroupVersionResource) *resour
defer c.lock.RUnlock()
return c.cache[gvr]
}

// readinessCheck checks if the storage is ready for accepting requests.
func (c *clusterCache) readinessCheck() error {
c.lock.RLock()
defer c.lock.RUnlock()

var failedChecks []string
for gvr, rc := range c.cache {
if rc.ReadinessCheck() != nil {
failedChecks = append(failedChecks, gvr.String())
}
}
if len(failedChecks) == 0 {
klog.Infof("ClusterCache(%s) is ready for all registered resources", c.clusterName)
return nil
}
klog.V(4).Infof("ClusterCache(%s) is not ready for: %v", c.clusterName, failedChecks)
return fmt.Errorf("ClusterCache(%s) is not ready for: %v", c.clusterName, failedChecks)
}
22 changes: 22 additions & 0 deletions pkg/search/proxy/store/multi_cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Store interface {
Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error)
List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error)
Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error)

// ReadinessCheck checks if the storage is ready for accepting requests.
ReadinessCheck() error
}

// MultiClusterCache caches resource from multi member clusters
Expand All @@ -72,6 +75,25 @@ func NewMultiClusterCache(newClientFunc func(string) (dynamic.Interface, error),
}
}

// ReadinessCheck checks if the storage is ready for accepting requests.
func (c *MultiClusterCache) ReadinessCheck() error {
c.lock.RLock()
defer c.lock.RUnlock()

var failedChecks []string
for cluster, cc := range c.cache {
if cc.readinessCheck() != nil {
failedChecks = append(failedChecks, cluster)
}
}
if len(failedChecks) == 0 {
klog.Infof("MultiClusterCache is ready for all registered clusters")
return nil
}
klog.V(4).Infof("ClusterCache is not ready for clusters: %v", failedChecks)
return fmt.Errorf("ClusterCache is not ready for clusters: %v", failedChecks)
}

// UpdateCache update cache for multi clusters
func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error {
if klog.V(3).Enabled() {
Expand Down
11 changes: 11 additions & 0 deletions pkg/search/proxy/store/multi_cluster_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -36,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/dynamic"
fakedynamic "k8s.io/client-go/dynamic/fake"
Expand Down Expand Up @@ -418,6 +420,15 @@ func TestMultiClusterCache_Get(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err = wait.PollUntilContextCancel(tt.args.ctx, 100*time.Millisecond, true,
func(_ context.Context) (bool, error) {
if checkErr := cache.ReadinessCheck(); checkErr == nil {
return true, nil
}
return false, nil
})
assert.NoError(t, err, "Deadline exceeded while waiting for storage readiness")

obj, err := cache.Get(tt.args.ctx, tt.args.gvr, tt.args.name, tt.args.options)
if !tt.want.errAssert(err) {
t.Errorf("Unexpected error: %v", err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/search/proxy/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ func (s *store) RequestWatchProgress(context.Context) error {
}

// ReadinessCheck checks if the storage is ready for accepting requests.
// Since store itself does not actually hold the data but only provides
// methods for querying, and the caller will not use this method to detect
// the ready status, so it is not necessary to implement this interface.
func (s *store) ReadinessCheck() error {
return fmt.Errorf("not implemented")
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/search/proxy/testing/mock_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,8 @@ func (c *MockStore) Watch(ctx context.Context, gvr schema.GroupVersionResource,

return c.WatchFunc(ctx, gvr, options)
}

// ReadinessCheck checks if the storage is ready for accepting requests.
func (c *MockStore) ReadinessCheck() error {
return nil
}

0 comments on commit 079d0ab

Please sign in to comment.