Skip to content

Commit

Permalink
wait for search multiClusterCache ReadinessCheck to be ok
Browse files Browse the repository at this point in the history
Signed-off-by: changzhen <[email protected]>
  • Loading branch information
XiShanYongYe-Chang committed Nov 20, 2024
1 parent c950edd commit e001898
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 15 deletions.
15 changes: 9 additions & 6 deletions cmd/karmada-search/app/karmada-search.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func run(ctx context.Context, o *options.Options, registryOptions ...Option) err
return nil
})

server.GenericAPIServer.AddPostStartHookOrDie("search-storage-cache-readiness", config.ExtraConfig.ProxyController.Hook)

if config.ExtraConfig.Controller != nil {
server.GenericAPIServer.AddPostStartHookOrDie("start-karmada-search-controller", func(context genericapiserver.PostStartHookContext) error {
// start ResourceRegistry controller
Expand Down Expand Up @@ -210,12 +212,13 @@ func config(o *options.Options, outOfTreeRegistryOptions ...Option) (*search.Con
}

proxyCtl, err = proxy.NewController(proxy.NewControllerOption{
RestConfig: serverConfig.ClientConfig,
RestMapper: restMapper,
KubeFactory: serverConfig.SharedInformerFactory,
KarmadaFactory: factory,
MinRequestTimeout: time.Second * time.Duration(serverConfig.Config.MinRequestTimeout),
OutOfTreeRegistry: outOfTreeRegistry,
RestConfig: serverConfig.ClientConfig,
RestMapper: restMapper,
KubeFactory: serverConfig.SharedInformerFactory,
KarmadaFactory: factory,
MinRequestTimeout: time.Second * time.Duration(serverConfig.Config.MinRequestTimeout),
StorageInitializationTimeout: serverConfig.StorageInitializationTimeout,
OutOfTreeRegistry: outOfTreeRegistry,
})

if err != nil {
Expand Down
44 changes: 36 additions & 8 deletions pkg/search/proxy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package proxy

import (
"context"
"errors"
"net/http"
"time"

Expand All @@ -26,10 +27,12 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -66,6 +69,8 @@ type Controller struct {
store store.Store

proxy framework.Proxy

storageInitializationTimeout time.Duration
}

// NewControllerOption is the Option for NewController().
Expand All @@ -76,7 +81,8 @@ type NewControllerOption struct {
KubeFactory informers.SharedInformerFactory
KarmadaFactory informerfactory.SharedInformerFactory

MinRequestTimeout time.Duration
MinRequestTimeout time.Duration
StorageInitializationTimeout time.Duration

OutOfTreeRegistry pluginruntime.Registry
}
Expand All @@ -97,13 +103,14 @@ func NewController(option NewControllerOption) (*Controller, error) {
proxy := pluginruntime.NewFramework(allPlugins)

ctl := &Controller{
restMapper: option.RestMapper,
negotiatedSerializer: scheme.Codecs.WithoutConversion(),
secretLister: secretLister,
clusterLister: clusterLister,
registryLister: option.KarmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(),
store: multiClusterStore,
proxy: proxy,
restMapper: option.RestMapper,
negotiatedSerializer: scheme.Codecs.WithoutConversion(),
secretLister: secretLister,
clusterLister: clusterLister,
registryLister: option.KarmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(),
store: multiClusterStore,
storageInitializationTimeout: option.StorageInitializationTimeout,
proxy: proxy,
}

workerOptions := util.Options{
Expand Down Expand Up @@ -312,3 +319,24 @@ func dynamicClientForClusterFunc(clusterLister clusterlisters.ClusterLister,
return dynamic.NewForConfig(clusterConfig)
}
}

func (ctl *Controller) storageReadinessCheck() bool {
return ctl.store.ReadinessCheck() == nil
}

// Hook waits for the controller to be in a storage ready state.
func (ctl *Controller) Hook(ctx genericapiserver.PostStartHookContext) error {
deadlineCtx, cancel := context.WithTimeout(ctx, ctl.storageInitializationTimeout)
defer cancel()
err := wait.PollUntilContextCancel(deadlineCtx, 100*time.Millisecond, true,
func(_ context.Context) (bool, error) {
if ok := ctl.storageReadinessCheck(); ok {
return true, nil
}
return false, nil
})
if errors.Is(err, context.DeadlineExceeded) {
klog.Warningf("Deadline exceeded while waiting for storage readiness... ignoring")
}
return nil
}
20 changes: 20 additions & 0 deletions pkg/search/proxy/store/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package store

import (
"fmt"
"sync"

"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -118,3 +119,22 @@ func (c *clusterCache) cacheForResource(gvr schema.GroupVersionResource) *resour
defer c.lock.RUnlock()
return c.cache[gvr]
}

// readinessCheck checks if the storage is ready for accepting requests.
func (c *clusterCache) readinessCheck() error {
c.lock.RLock()
defer c.lock.RUnlock()

var failedChecks []string
for gvr, rc := range c.cache {
if rc.ReadinessCheck() != nil {
failedChecks = append(failedChecks, gvr.String())
}
}
if len(failedChecks) == 0 {
klog.Infof("ClusterCache(%s) is ready for all registered resources", c.clusterName)
return nil
}
klog.Errorf("ClusterCache(%s) is not ready for: %v", c.clusterName, failedChecks)
return fmt.Errorf("ClusterCache(%s) is not ready for: %v", c.clusterName, failedChecks)
}
20 changes: 20 additions & 0 deletions pkg/search/proxy/store/multi_cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Store interface {
Get(ctx context.Context, gvr schema.GroupVersionResource, name string, options *metav1.GetOptions) (runtime.Object, error)
List(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (runtime.Object, error)
Watch(ctx context.Context, gvr schema.GroupVersionResource, options *metainternalversion.ListOptions) (watch.Interface, error)
ReadinessCheck() error
}

// MultiClusterCache caches resource from multi member clusters
Expand All @@ -72,6 +73,25 @@ func NewMultiClusterCache(newClientFunc func(string) (dynamic.Interface, error),
}
}

// ReadinessCheck checks if the storage is ready for accepting requests.
func (c *MultiClusterCache) ReadinessCheck() error {
c.lock.RLock()
defer c.lock.RUnlock()

var failedChecks []string
for cluster, cc := range c.cache {
if cc.readinessCheck() != nil {
failedChecks = append(failedChecks, cluster)
}
}
if len(failedChecks) == 0 {
klog.Infof("MultiClusterCache is ready for all registered clusters")
return nil
}
klog.Errorf("ClusterCache is not ready for clusters: %v", failedChecks)
return fmt.Errorf("ClusterCache is not ready for clusters: %v", failedChecks)
}

// UpdateCache update cache for multi clusters
func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error {
if klog.V(3).Enabled() {
Expand Down
11 changes: 11 additions & 0 deletions pkg/search/proxy/store/multi_cluster_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -36,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/dynamic"
fakedynamic "k8s.io/client-go/dynamic/fake"
Expand Down Expand Up @@ -418,6 +420,15 @@ func TestMultiClusterCache_Get(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err = wait.PollUntilContextCancel(tt.args.ctx, 100*time.Millisecond, true,
func(_ context.Context) (bool, error) {
if checkErr := cache.ReadinessCheck(); checkErr == nil {
return true, nil
}
return false, nil
})
assert.NoError(t, err, "Deadline exceeded while waiting for storage readiness")

obj, err := cache.Get(tt.args.ctx, tt.args.gvr, tt.args.name, tt.args.options)
if !tt.want.errAssert(err) {
t.Errorf("Unexpected error: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/search/proxy/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (s *store) RequestWatchProgress(context.Context) error {

// ReadinessCheck checks if the storage is ready for accepting requests.
func (s *store) ReadinessCheck() error {
return fmt.Errorf("not implemented")
return nil
}

func (s *store) client(namespace string) (dynamic.ResourceInterface, error) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/search/proxy/testing/mock_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,8 @@ func (c *MockStore) Watch(ctx context.Context, gvr schema.GroupVersionResource,

return c.WatchFunc(ctx, gvr, options)
}

// ReadinessCheck checks if the storage is ready for accepting requests.
func (c *MockStore) ReadinessCheck() error {
return nil
}

0 comments on commit e001898

Please sign in to comment.