Skip to content

Commit

Permalink
rollouts: add caching for discovered packages (kptdev#3706)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherFry authored and droot committed Feb 8, 2023
1 parent 5186d27 commit 7d70e8b
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 22 deletions.
38 changes: 33 additions & 5 deletions rollouts/controllers/rollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"context"
"flag"
"fmt"
"sync"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -58,6 +60,9 @@ type RolloutReconciler struct {
store *clusterstore.ClusterStore

Scheme *runtime.Scheme

mutex sync.Mutex
packageDiscoveryCache map[types.NamespacedName]*packagediscovery.PackageDiscovery
}

//+kubebuilder:rbac:groups=gitops.kpt.dev,resources=rollouts,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -105,6 +110,14 @@ func (r *RolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
} else {
// The object is being deleted
if controllerutil.ContainsFinalizer(&rollout, myFinalizerName) {
func() {
r.mutex.Lock()
defer r.mutex.Unlock()

// clean cache
delete(r.packageDiscoveryCache, req.NamespacedName)
}()

// remove our finalizer from the list and update it.
controllerutil.RemoveFinalizer(&rollout, myFinalizerName)
if err := r.Update(ctx, &rollout); err != nil {
Expand All @@ -115,15 +128,30 @@ func (r *RolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

err := r.reconcileRollout(ctx, &rollout)
packageDiscoveryClient := r.getPackageDiscoveryClient(req.NamespacedName)

err := r.reconcileRollout(ctx, &rollout, packageDiscoveryClient)
if err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

func (r *RolloutReconciler) reconcileRollout(ctx context.Context, rollout *gitopsv1alpha1.Rollout) error {
func (r *RolloutReconciler) getPackageDiscoveryClient(rolloutNamespacedName types.NamespacedName) *packagediscovery.PackageDiscovery {
r.mutex.Lock()
defer r.mutex.Unlock()

client, found := r.packageDiscoveryCache[rolloutNamespacedName]
if !found {
client = packagediscovery.NewPackageDiscovery(r.Client, rolloutNamespacedName.Namespace)
r.packageDiscoveryCache[rolloutNamespacedName] = client
}

return client
}

func (r *RolloutReconciler) reconcileRollout(ctx context.Context, rollout *gitopsv1alpha1.Rollout, packageDiscoveryClient *packagediscovery.PackageDiscovery) error {
logger := log.FromContext(ctx)

clusters, err := r.store.ListClusters(ctx, rollout.Spec.Targets.Selector)
Expand All @@ -132,9 +160,7 @@ func (r *RolloutReconciler) reconcileRollout(ctx context.Context, rollout *gitop
}
logger.Info("discovered clusters", "count", len(clusters.Items))

packageDiscoveryClient := packagediscovery.NewPackageDiscovery(rollout.Spec.Packages, r.Client, rollout.Namespace)

discoveredPackages, err := packageDiscoveryClient.GetPackages(ctx)
discoveredPackages, err := packageDiscoveryClient.GetPackages(ctx, rollout.Spec.Packages)
if err != nil {
logger.Error(err, "failed to discover packages")
return client.IgnoreNotFound(err)
Expand Down Expand Up @@ -423,6 +449,8 @@ func (r *RolloutReconciler) SetupWithManager(mgr ctrl.Manager) error {
}
r.Client = mgr.GetClient()

r.packageDiscoveryCache = make(map[types.NamespacedName]*packagediscovery.PackageDiscovery)

// setup the clusterstore
r.store = &clusterstore.ClusterStore{
Config: mgr.GetConfig(),
Expand Down
2 changes: 1 addition & 1 deletion rollouts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/GoogleCloudPlatform/k8s-config-connector v1.98.0
github.com/golang/protobuf v1.5.2
github.com/google/cel-go v0.13.0
github.com/google/go-cmp v0.5.9
github.com/google/go-github/v48 v48.2.0
github.com/onsi/ginkgo/v2 v2.1.6
github.com/onsi/gomega v1.20.1
Expand Down Expand Up @@ -46,7 +47,6 @@ require (
github.com/golang-jwt/jwt/v4 v4.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
Expand Down
56 changes: 40 additions & 16 deletions rollouts/pkg/packagediscovery/packagediscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@ import (
"net/http"
"regexp"
"strings"
"sync"
"time"

gitopsv1alpha1 "github.com/GoogleContainerTools/kpt/rollouts/api/v1alpha1"
"github.com/google/go-cmp/cmp"
"github.com/google/go-github/v48/github"
"golang.org/x/oauth2"
coreapi "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type PackageDiscovery struct {
config gitopsv1alpha1.PackagesConfig
client client.Client
namespace string
mutex sync.Mutex
cache *Cache
}

type DiscoveredPackage struct {
Expand All @@ -41,45 +45,64 @@ type DiscoveredPackage struct {
Revision string
}

func NewPackageDiscovery(config gitopsv1alpha1.PackagesConfig, client client.Client, namespace string) *PackageDiscovery {
type Cache struct {
config gitopsv1alpha1.PackagesConfig
packages []DiscoveredPackage
expiration time.Time
}

func NewPackageDiscovery(client client.Client, namespace string) *PackageDiscovery {
return &PackageDiscovery{
config: config,
client: client,
namespace: namespace,
}
}

func (d *PackageDiscovery) GetPackages(ctx context.Context) ([]DiscoveredPackage, error) {
if d.config.SourceType != gitopsv1alpha1.GitHub {
return nil, fmt.Errorf("%v source type not supported yet", d.config.SourceType)
func (d *PackageDiscovery) GetPackages(ctx context.Context, config gitopsv1alpha1.PackagesConfig) ([]DiscoveredPackage, error) {
d.mutex.Lock()
defer d.mutex.Unlock()

if d.useCache(config) {
return d.cache.packages, nil
}

gitHubClient, err := d.getGitHubClient(ctx)
if config.SourceType != gitopsv1alpha1.GitHub {
return nil, fmt.Errorf("%v source type not supported yet", config.SourceType)
}

gitHubSelector := config.GitHub.Selector

gitHubClient, err := d.getGitHubClient(ctx, gitHubSelector)
if err != nil {
return nil, fmt.Errorf("unable to create github client: %w", err)
}

discoveredPackages := []DiscoveredPackage{}

repositoryNames, err := d.getRepositoryNames(gitHubClient, ctx)
repositoryNames, err := d.getRepositoryNames(gitHubClient, gitHubSelector, ctx)
if err != nil {
return nil, fmt.Errorf("unable to get repositories: %w", err)
}

for _, repositoryName := range repositoryNames {
repoPackages, err := d.getPackagesForRepository(gitHubClient, ctx, repositoryName)
repoPackages, err := d.getPackagesForRepository(gitHubClient, ctx, gitHubSelector, repositoryName)
if err != nil {
return nil, fmt.Errorf("unable to get packages: %w", err)
}

discoveredPackages = append(discoveredPackages, repoPackages...)
}

d.cache = &Cache{
packages: discoveredPackages,
config: config,
expiration: time.Now().Add(1 * time.Minute),
}

return discoveredPackages, nil
}

func (d *PackageDiscovery) getRepositoryNames(gitHubClient *github.Client, ctx context.Context) ([]string, error) {
selector := d.config.GitHub.Selector
func (d *PackageDiscovery) getRepositoryNames(gitHubClient *github.Client, selector gitopsv1alpha1.GitHubSelector, ctx context.Context) ([]string, error) {
repositoryNames := []string{}

if isSelectorField(selector.Repo) {
Expand Down Expand Up @@ -107,9 +130,8 @@ func (d *PackageDiscovery) getRepositoryNames(gitHubClient *github.Client, ctx c
return repositoryNames, nil
}

func (d *PackageDiscovery) getPackagesForRepository(gitHubClient *github.Client, ctx context.Context, repoName string) ([]DiscoveredPackage, error) {
func (d *PackageDiscovery) getPackagesForRepository(gitHubClient *github.Client, ctx context.Context, selector gitopsv1alpha1.GitHubSelector, repoName string) ([]DiscoveredPackage, error) {
discoveredPackages := []DiscoveredPackage{}
selector := d.config.GitHub.Selector

if isSelectorField(selector.Directory) {
tree, _, err := gitHubClient.Git.GetTree(ctx, selector.Org, repoName, selector.Revision, true)
Expand Down Expand Up @@ -138,9 +160,7 @@ func (d *PackageDiscovery) getPackagesForRepository(gitHubClient *github.Client,
return discoveredPackages, nil
}

func (d *PackageDiscovery) getGitHubClient(ctx context.Context) (*github.Client, error) {
selector := d.config.GitHub.Selector

func (d *PackageDiscovery) getGitHubClient(ctx context.Context, selector gitopsv1alpha1.GitHubSelector) (*github.Client, error) {
httpClient := &http.Client{}

if secretName := selector.SecretRef.Name; secretName != "" {
Expand All @@ -164,6 +184,10 @@ func (d *PackageDiscovery) getGitHubClient(ctx context.Context) (*github.Client,
return gitHubClient, nil
}

func (d *PackageDiscovery) useCache(config gitopsv1alpha1.PackagesConfig) bool {
return d.cache != nil && cmp.Equal(config, d.cache.config) && time.Now().Before(d.cache.expiration)
}

func filterByPattern(pattern string, list []string) []string {
matches := []string{}

Expand Down

0 comments on commit 7d70e8b

Please sign in to comment.