Skip to content

Commit

Permalink
Refactor install RG command
Browse files Browse the repository at this point in the history
  • Loading branch information
mortent committed Feb 16, 2022
1 parent 10868bd commit b76af35
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 143 deletions.
65 changes: 0 additions & 65 deletions commands/installrg.go

This file was deleted.

3 changes: 2 additions & 1 deletion commands/livecmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/GoogleContainerTools/kpt/internal/cmdapply"
"github.com/GoogleContainerTools/kpt/internal/cmddestroy"
"github.com/GoogleContainerTools/kpt/internal/cmdinstallrg"
"github.com/GoogleContainerTools/kpt/internal/cmdliveinit"
"github.com/GoogleContainerTools/kpt/internal/cmdmigrate"
"github.com/GoogleContainerTools/kpt/internal/docs/generated/livedocs"
Expand Down Expand Up @@ -56,7 +57,7 @@ func GetLiveCommand(ctx context.Context, _, version string) *cobra.Command {
applyCmd := cmdapply.NewCommand(ctx, f, ioStreams)
destroyCmd := cmddestroy.NewCommand(ctx, f, ioStreams)
statusCmd := status.NewCommand(ctx, f)
installRGCmd := GetInstallRGRunner(f, ioStreams).Command
installRGCmd := cmdinstallrg.NewCommand(ctx, f, ioStreams)
liveCmd.AddCommand(initCmd, applyCmd, destroyCmd, statusCmd, installRGCmd)

// Add the migrate command to change from ConfigMap to ResourceGroup inventory
Expand Down
83 changes: 83 additions & 0 deletions internal/cmdinstallrg/cmdinstallrg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmdinstallrg

import (
"context"
"fmt"

"github.com/GoogleContainerTools/kpt/internal/docs/generated/livedocs"
"github.com/GoogleContainerTools/kpt/pkg/live"
"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)

// NewRunner returns a command runner
func NewRunner(ctx context.Context, factory cmdutil.Factory,
ioStreams genericclioptions.IOStreams) *Runner {

r := &Runner{
ctx: ctx,
ioStreams: ioStreams,
factory: factory,
}
c := &cobra.Command{
Use: "install-resource-group",
RunE: r.runE,
PreRunE: r.preRunE,
Short: livedocs.InstallResourceGroupShort,
Long: livedocs.InstallResourceGroupShort + "\n" + livedocs.InstallResourceGroupLong,
Example: livedocs.InstallResourceGroupExamples,
}
r.Command = c
return r
}

func NewCommand(ctx context.Context, factory cmdutil.Factory,
ioStreams genericclioptions.IOStreams) *cobra.Command {
return NewRunner(ctx, factory, ioStreams).Command
}

// Runner contains the run function
type Runner struct {
ctx context.Context
Command *cobra.Command
ioStreams genericclioptions.IOStreams
factory cmdutil.Factory
}

func (r *Runner) preRunE(_ *cobra.Command, _ []string) error {
return nil
}

func (r *Runner) runE(_ *cobra.Command, args []string) error {
// Validate the number of arguments.
if len(args) > 0 {
return fmt.Errorf("too many arguments; install-resource-group takes no arguments")
}
fmt.Fprint(r.ioStreams.Out, "installing inventory ResourceGroup CRD...")

err := (&live.ResourceGroupInstaller{
Factory: r.factory,
}).InstallRG(r.ctx)

if err == nil {
fmt.Fprintln(r.ioStreams.Out, "success")
} else {
fmt.Fprintln(r.ioStreams.Out, "failed")
}
return err
}
5 changes: 4 additions & 1 deletion internal/cmdmigrate/migratecmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ func (mr *MigrateRunner) applyCRD() error {
return nil
}
// Install the ResourceGroup CRD to the cluster.
err := live.InstallResourceGroupCRD(mr.factory)

err := (&live.ResourceGroupInstaller{
Factory: mr.factory,
}).InstallRG(mr.ctx)
if err == nil {
fmt.Fprintln(mr.ioStreams.Out, "success")
} else {
Expand Down
4 changes: 3 additions & 1 deletion internal/cmdutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
func InstallResourceGroupCRD(ctx context.Context, f util.Factory) error {
pr := printer.FromContextOrDie(ctx)
pr.Printf("installing inventory ResourceGroup CRD.\n")
err := live.InstallResourceGroupCRD(f)
err := (&live.ResourceGroupInstaller{
Factory: f,
}).InstallRG(ctx)
if err != nil {
return &ResourceGroupCRDInstallError{
Err: err,
Expand Down
161 changes: 86 additions & 75 deletions pkg/live/inventoryrg.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,28 @@ import (
"time"

"github.com/GoogleContainerTools/kpt/pkg/status"
"github.com/GoogleContainerTools/kpt/thirdparty/cli-utils/pkg/apply/taskrunner"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/klog/v2"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"k8s.io/kubectl/pkg/util"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
kstatus "sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/kustomize/kyaml/yaml"
)

const (
applyCRDTimeout = 10 * time.Second
applyCRDPollInterval = 2 * time.Second
applyRGTimeout = 10 * time.Second
applyRGPollInterval = 2 * time.Second
)

// ResourceGroupGVK is the group/version/kind of the custom
Expand Down Expand Up @@ -244,83 +248,90 @@ func ResourceGroupCRDApplied(factory cmdutil.Factory) bool {
return true
}

// InstallResourceGroupCRD applies the custom resource definition for the
// ResourceGroup by creating and running a TaskQueue of Tasks necessary.
// The Tasks are 1) Apply CRD task, 2) Wait Task (for CRD to become
// established), and 3) Reset RESTMapper task. Returns an error if
// a non-"AlreadyExists" error is returned on the event channel.
// Runs the CRD installation in a separate goroutine (timeout
// ensures no hanging).
func InstallResourceGroupCRD(factory cmdutil.Factory) error {
eventChannel := make(chan event.Event)
go func() {
defer close(eventChannel)
mapper, err := factory.ToRESTMapper()
if err != nil {
handleError(eventChannel, err)
return
}
crd, err := rgCRD(mapper)
if err != nil {
handleError(eventChannel, err)
return
}
// Create the task to apply the ResourceGroup CRD.
applyRGTask := NewApplyCRDTask(factory, crd)
objs := object.UnstructuredSetToObjMetadataSet([]*unstructured.Unstructured{crd})
// Create the tasks to apply the ResourceGroup CRD.
tasks := []taskrunner.Task{
applyRGTask,
taskrunner.NewWaitTask("wait-rg-crd", objs, taskrunner.AllCurrent,
applyCRDTimeout, mapper),
}
// Create the task queue channel, and send tasks in order into the channel.
taskQueue := make(chan taskrunner.Task, len(tasks))
for _, t := range tasks {
taskQueue <- t
}
statusPoller, err := status.NewStatusPoller(factory)
if err != nil {
handleError(eventChannel, err)
return
}
// Create a new cache map to hold the last known resource state & status
resourceCache := cache.NewResourceCacheMap()
// Run the task queue.
runner := taskrunner.NewTaskStatusRunner(objs, statusPoller, resourceCache)
err = runner.Run(context.Background(), taskQueue, eventChannel, taskrunner.Options{
PollInterval: applyCRDPollInterval,
UseCache: true,
EmitStatusEvents: true,
})
if err != nil {
handleError(eventChannel, err)
return
}
}()
// ResourceGroupInstaller can install the ResourceGroup CRD into a cluster.
type ResourceGroupInstaller struct {
Factory cmdutil.Factory
}

// Return the error on the eventChannel if it exists; return
// closes the channel. "AlreadyExists" is NOT an error.
for e := range eventChannel {
if e.Type == event.ErrorType {
err := e.ErrorEvent.Err
if !apierrors.IsAlreadyExists(err) {
return err
}
func (rgi *ResourceGroupInstaller) InstallRG(ctx context.Context) error {
poller, err := status.NewStatusPoller(rgi.Factory)
if err != nil {
return err
}

mapper, err := rgi.Factory.ToRESTMapper()
if err != nil {
return err
}

crd, err := rgCRD(mapper)
if err != nil {
return err
}

if err := rgi.applyRG(crd); err != nil {
if apierrors.IsAlreadyExists(err) {
return nil
}
return err
}

return nil
// Replace with call to meta.MaybeResetRESTMapper when we update
// k8s libraries.
m, ok := mapper.(interface{ Reset() })
if ok {
m.Reset()
}

objs := object.UnstructuredSetToObjMetadataSet([]*unstructured.Unstructured{crd})
ctx, cancel := context.WithTimeout(ctx, applyRGTimeout)
return func() error {
defer cancel()
for e := range poller.Poll(ctx, objs, polling.Options{PollInterval: applyRGPollInterval}) {
switch e.EventType {
case pollevent.ErrorEvent:
return e.Error
case pollevent.ResourceUpdateEvent:
if e.Resource.Status == kstatus.CurrentStatus {
// TODO: Replace this with a call to meta.MaybeResetRESTMapper
// once we update the k8s libraries.
m, err := taskrunner.ExtractDeferredDiscoveryRESTMapper(mapper)
if err != nil {
return err
}
m.Reset()
return nil
}
}
}
return nil
}()
}

// handleError sends an error onto the event channel.
func handleError(eventChannel chan event.Event, err error) {
eventChannel <- event.Event{
Type: event.ErrorType,
ErrorEvent: event.ErrorEvent{
Err: err,
},
func (rgi *ResourceGroupInstaller) applyRG(crd runtime.Object) error {
mapper, err := rgi.Factory.ToRESTMapper()
if err != nil {
return err
}
mapping, err := mapper.RESTMapping(crdGroupKind)
if err != nil {
return err
}
client, err := rgi.Factory.UnstructuredClientForMapping(mapping)
if err != nil {
return err
}

// Set the "last-applied-annotation" so future applies work correctly.
if err := util.CreateApplyAnnotation(crd, unstructured.UnstructuredJSONScheme); err != nil {
return err
}
// Apply the CRD to the cluster and ignore already exists error.
var clearResourceVersion = false
var emptyNamespace = ""
helper := resource.NewHelper(client, mapping)
_, err = helper.Create(emptyNamespace, clearResourceVersion, crd)
return err
}

// rgCRD returns the ResourceGroup CRD in Unstructured format or an error.
Expand Down
Loading

0 comments on commit b76af35

Please sign in to comment.