Skip to content

Commit

Permalink
Resolve functions using function objects registered in apiserver
Browse files Browse the repository at this point in the history
The intent is that many users will run a function mirror or similar.
  • Loading branch information
justinsb committed Oct 28, 2022
1 parent fa37dbc commit 17132d4
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 38 deletions.
3 changes: 3 additions & 0 deletions porch/deployments/porch/5-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ rules:
resources:
["mutatingwebhookconfigurations", "validatingwebhookconfigurations"]
verbs: ["get", "watch", "list"]
- apiGroups: ["porch.kpt.dev"]
resources: ["functions"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
- apiGroups: ["config.porch.kpt.dev"]
resources: ["repositories", "repositories/status"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
Expand Down
31 changes: 22 additions & 9 deletions porch/pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (

"github.com/GoogleContainerTools/kpt/internal/fnruntime"
"github.com/GoogleContainerTools/kpt/porch/api/porch/install"
porchapi "github.com/GoogleContainerTools/kpt/porch/api/porch/v1alpha1"
configapi "github.com/GoogleContainerTools/kpt/porch/api/porchconfig/v1alpha1"
internalapi "github.com/GoogleContainerTools/kpt/porch/internal/api/porchinternal/v1alpha1"
"github.com/GoogleContainerTools/kpt/porch/pkg/cache"
"github.com/GoogleContainerTools/kpt/porch/pkg/engine"
"github.com/GoogleContainerTools/kpt/porch/pkg/kpt"
"github.com/GoogleContainerTools/kpt/porch/pkg/meta"
"github.com/GoogleContainerTools/kpt/porch/pkg/registry/porch"
"google.golang.org/api/option"
Expand Down Expand Up @@ -73,6 +73,7 @@ type ExtraConfig struct {
CoreAPIKubeconfigPath string
CacheDirectory string
FunctionRunnerAddress string
DefaultImagePrefix string
}

// Config defines the config for the apiserver
Expand Down Expand Up @@ -147,6 +148,11 @@ func (c completedConfig) getCoreClient() (client.WithWatch, error) {
if err := configapi.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("error building scheme: %w", err)
}

if err := porchapi.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("error building scheme: %w", err)
}

if err := corev1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("error building scheme: %w", err)
}
Expand Down Expand Up @@ -210,18 +216,25 @@ func (c completedConfig) New() (*PorchServer, error) {
referenceResolver := porch.NewReferenceResolver(coreClient)
userInfoProvider := &porch.ApiserverUserInfoProvider{}

runnerOptions := fnruntime.RunnerOptions{}
runnerOptions.ResolveToImage = resolveToImagePorch

runnerOptions.InitDefaults()

renderer := kpt.NewRenderer(runnerOptions)

cache := cache.NewCache(c.ExtraConfig.CacheDirectory, cache.CacheOptions{
CredentialResolver: credentialResolver,
UserInfoProvider: userInfoProvider,
MetadataStore: metadataStore,
})

runnerOptionsResolver := func(namespace string) fnruntime.RunnerOptions {
runnerOptions := fnruntime.RunnerOptions{}
runnerOptions.InitDefaults()
r := &KubeFunctionResolver{
client: coreClient,
defaultImagePrefix: c.ExtraConfig.DefaultImagePrefix,
namespace: namespace,
}
runnerOptions.ResolveToImage = r.resolveToImagePorch

return runnerOptions
}

cad, err := engine.NewCaDEngine(
engine.WithCache(cache),
// The order of registering the function runtimes matters here. When
Expand All @@ -230,7 +243,7 @@ func (c completedConfig) New() (*PorchServer, error) {
engine.WithBuiltinFunctionRuntime(),
engine.WithGRPCFunctionRuntime(c.ExtraConfig.FunctionRunnerAddress),
engine.WithCredentialResolver(credentialResolver),
engine.WithRenderer(renderer),
engine.WithRunnerOptionsResolver(runnerOptionsResolver),
engine.WithReferenceResolver(referenceResolver),
engine.WithUserInfoProvider(userInfoProvider),
engine.WithMetadataStore(metadataStore),
Expand Down
36 changes: 33 additions & 3 deletions porch/pkg/apiserver/resolvetoimage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,55 @@ import (
"strings"

"github.com/GoogleContainerTools/kpt/internal/util/porch"
"github.com/GoogleContainerTools/kpt/porch/api/porch/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// KubeFunctionResolver resolves function names to full image paths
type KubeFunctionResolver struct {
client client.WithWatch
defaultImagePrefix string
// resolver *FunctionResolver
namespace string
}

// resolveToImagePorch converts the function short path to the full image url.
// If the function is Catalog function, it adds "gcr.io/kpt-fn/".e.g. set-namespace:v0.1 --> gcr.io/kpt-fn/set-namespace:v0.1
// If the function is porch function, it queries porch to get the function image by name and namespace.
// e.g. default:set-namespace:v0.1 --> us-west1-docker.pkg.dev/cpa-kit-dev/packages/set-namespace:v0.1
func resolveToImagePorch(ctx context.Context, image string) (string, error) {
func (r *KubeFunctionResolver) resolveToImagePorch(ctx context.Context, image string) (string, error) {
segments := strings.Split(image, ":")
if len(segments) == 4 {
// Porch function
// TODO: Remove this legacy configuration
functionName := strings.Join(segments[1:], ":")
function, err := porch.FunctionGetter{}.Get(ctx, functionName, segments[0])
if err != nil {
return "", fmt.Errorf("failed to resolve image: %w", err)
return "", fmt.Errorf("failed to get image for function %q: %w", image, err)
}
return function.Spec.Image, nil
}
if !strings.Contains(image, "/") {
return fmt.Sprintf("gcr.io/kpt-fn/%s", image), nil
var function v1alpha1.Function
// TODO: Use fieldSelectors and better lookup
name := "functions:" + image + ":latest"
key := types.NamespacedName{
Namespace: r.namespace,
Name: name,
}
// We query the apiserver for these types, even if we could query directly; this will then work with CRDs etc.
// TODO: We need to think about priority-and-fairness with loopback queries
if err := r.client.Get(ctx, key, &function); err != nil {
if !apierrors.IsNotFound(err) {
return "", fmt.Errorf("failed to get image for function %q: %w", image, err)
}
} else {
return function.Spec.Image, nil
}
// TODO: Fallback to cluster-scoped?
return r.defaultImagePrefix + image, nil
}
return image, nil
}
3 changes: 3 additions & 0 deletions porch/pkg/cmd/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type PorchServerOptions struct {
CacheDirectory string
CoreAPIKubeconfigPath string
FunctionRunnerAddress string
DefaultImagePrefix string

SharedInformerFactory informers.SharedInformerFactory
StdOut io.Writer
Expand Down Expand Up @@ -182,6 +183,7 @@ func (o *PorchServerOptions) Config() (*apiserver.Config, error) {
CoreAPIKubeconfigPath: o.CoreAPIKubeconfigPath,
CacheDirectory: o.CacheDirectory,
FunctionRunnerAddress: o.FunctionRunnerAddress,
DefaultImagePrefix: o.DefaultImagePrefix,
},
}
return config, nil
Expand Down Expand Up @@ -225,5 +227,6 @@ func (o *PorchServerOptions) AddFlags(fs *pflag.FlagSet) {
}

fs.StringVar(&o.FunctionRunnerAddress, "function-runner", "", "Address of the function runner gRPC service.")
fs.StringVar(&o.DefaultImagePrefix, "default-image-prefix", "gcr.io/kpt-fn/", "Default prefix for unqualified function names")
fs.StringVar(&o.CacheDirectory, "cache-directory", "", "Directory where Porch server stores repository and package caches.")
}
40 changes: 26 additions & 14 deletions porch/pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"unicode"

"github.com/GoogleContainerTools/kpt/internal/builtins"
"github.com/GoogleContainerTools/kpt/internal/fnruntime"
kptfile "github.com/GoogleContainerTools/kpt/pkg/api/kptfile/v1"
"github.com/GoogleContainerTools/kpt/pkg/fn"
api "github.com/GoogleContainerTools/kpt/porch/api/porch/v1alpha1"
Expand All @@ -44,6 +45,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/kustomize/kyaml/comments"
"sigs.k8s.io/kustomize/kyaml/kio"
"sigs.k8s.io/kustomize/kyaml/yaml"
Expand Down Expand Up @@ -137,8 +139,11 @@ func NewCaDEngine(opts ...EngineOption) (CaDEngine, error) {
}

type cadEngine struct {
cache *cache.Cache
renderer fn.Renderer
cache *cache.Cache

// runnerOptionsResolver returns the RunnerOptions for function execution in the specified namespace.
runnerOptionsResolver func(namespace string) fnruntime.RunnerOptions

runtime fn.FunctionRuntime
credentialResolver repository.CredentialResolver
referenceResolver ReferenceResolver
Expand Down Expand Up @@ -411,7 +416,7 @@ func (cad *cadEngine) applyTasks(ctx context.Context, draft repository.PackageDr
}

// Render package after creation.
mutations = cad.conditionalAddRender(mutations)
mutations = cad.conditionalAddRender(obj, mutations)

baseResources := repository.PackageResources{}
if _, _, err := applyResourceMutations(ctx, draft, baseResources, mutations); err != nil {
Expand Down Expand Up @@ -488,14 +493,17 @@ func (cad *cadEngine) mapTaskToMutation(ctx context.Context, obj *api.PackageRev
// TODO: We should find a different way to do this. Probably a separate
// task for render.
if task.Eval.Image == "render" {
runnerOptions := cad.runnerOptionsResolver(obj.Namespace)
return &renderPackageMutation{
renderer: cad.renderer,
runtime: cad.runtime,
runnerOptions: runnerOptions,
runtime: cad.runtime,
}, nil
} else {
runnerOptions := cad.runnerOptionsResolver(obj.Namespace)
return &evalFunctionMutation{
runtime: cad.runtime,
task: task,
runnerOptions: runnerOptions,
runtime: cad.runtime,
task: task,
}, nil
}

Expand Down Expand Up @@ -598,7 +606,7 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
}

// Re-render if we are making changes.
mutations = cad.conditionalAddRender(mutations)
mutations = cad.conditionalAddRender(newObj, mutations)

draft, err := repo.UpdatePackageRevision(ctx, oldPackage.repoPackageRevision)
if err != nil {
Expand All @@ -620,7 +628,7 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
}

// Re-render if we are making changes.
mutations = cad.conditionalAddRender(mutations)
mutations = cad.conditionalAddRender(newObj, mutations)

// TODO: Handle the case if alongside lifecycle change, tasks are changed too.
// Update package contents only if the package is in draft state
Expand Down Expand Up @@ -752,7 +760,7 @@ func convertStatusToKptfile(s api.ConditionStatus) kptfile.ConditionStatus {

// conditionalAddRender adds a render mutation to the end of the mutations slice if the last
// entry is not already a render mutation.
func (cad *cadEngine) conditionalAddRender(mutations []mutation) []mutation {
func (cad *cadEngine) conditionalAddRender(subject client.Object, mutations []mutation) []mutation {
if len(mutations) == 0 {
return mutations
}
Expand All @@ -763,9 +771,11 @@ func (cad *cadEngine) conditionalAddRender(mutations []mutation) []mutation {
return mutations
}

runnerOptions := cad.runnerOptionsResolver(subject.GetNamespace())

return append(mutations, &renderPackageMutation{
renderer: cad.renderer,
runtime: cad.runtime,
runnerOptions: runnerOptions,
runtime: cad.runtime,
})
}

Expand Down Expand Up @@ -888,6 +898,8 @@ func (cad *cadEngine) UpdatePackageResources(ctx context.Context, repositoryObj
return nil, nil, err
}

runnerOptions := cad.runnerOptionsResolver(old.GetNamespace())

mutations := []mutation{
&mutationReplaceResources{
newResources: new,
Expand Down Expand Up @@ -916,8 +928,8 @@ func (cad *cadEngine) UpdatePackageResources(ctx context.Context, repositoryObj
draft,
appliedResources,
[]mutation{&renderPackageMutation{
renderer: cad.renderer,
runtime: cad.runtime,
runnerOptions: runnerOptions,
runtime: cad.runtime,
}})

// No lifecycle change when updating package resources; updates are done.
Expand Down
20 changes: 15 additions & 5 deletions porch/pkg/engine/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import (
)

type evalFunctionMutation struct {
runtime fn.FunctionRuntime
task *api.Task
runtime fn.FunctionRuntime
runnerOptions fnruntime.RunnerOptions
task *api.Task
}

func (m *evalFunctionMutation) Apply(ctx context.Context, resources repository.PackageResources) (repository.PackageResources, *api.TaskResult, error) {
Expand All @@ -40,11 +41,20 @@ func (m *evalFunctionMutation) Apply(ctx context.Context, resources repository.P

e := m.task.Eval

function := v1.Function{
Image: e.Image,
}
if function.Image != "" && m.runnerOptions.ResolveToImage != nil {
img, err := m.runnerOptions.ResolveToImage(ctx, function.Image)
if err != nil {
return repository.PackageResources{}, nil, err
}
function.Image = img
}

// TODO: Apply should accept filesystem instead of PackageResources

runner, err := m.runtime.GetRunner(ctx, &v1.Function{
Image: e.Image,
})
runner, err := m.runtime.GetRunner(ctx, &function)
if err != nil {
return repository.PackageResources{}, nil, fmt.Errorf("failed to create function runner: %w", err)
}
Expand Down
9 changes: 7 additions & 2 deletions porch/pkg/engine/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package engine
import (
"fmt"

"github.com/GoogleContainerTools/kpt/internal/fnruntime"
"github.com/GoogleContainerTools/kpt/pkg/fn"
"github.com/GoogleContainerTools/kpt/porch/pkg/cache"
"github.com/GoogleContainerTools/kpt/porch/pkg/kpt"
Expand Down Expand Up @@ -88,9 +89,13 @@ func WithSimpleFunctionRuntime() EngineOption {
})
}

func WithRenderer(renderer fn.Renderer) EngineOption {
func WithRunnerOptions(options fnruntime.RunnerOptions) EngineOption {
return WithRunnerOptionsResolver(func(namespace string) fnruntime.RunnerOptions { return options })
}

func WithRunnerOptionsResolver(fn func(namespace string) fnruntime.RunnerOptions) EngineOption {
return EngineOptionFunc(func(engine *cadEngine) error {
engine.renderer = renderer
engine.runnerOptionsResolver = fn
return nil
})
}
Expand Down
9 changes: 6 additions & 3 deletions porch/pkg/engine/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import (
"path"
"strings"

"github.com/GoogleContainerTools/kpt/internal/fnruntime"
fnresult "github.com/GoogleContainerTools/kpt/pkg/api/fnresult/v1"
"github.com/GoogleContainerTools/kpt/pkg/fn"
api "github.com/GoogleContainerTools/kpt/porch/api/porch/v1alpha1"
"github.com/GoogleContainerTools/kpt/porch/pkg/kpt"
"github.com/GoogleContainerTools/kpt/porch/pkg/repository"
"go.opentelemetry.io/otel/trace"
"k8s.io/klog/v2"
"sigs.k8s.io/kustomize/kyaml/filesys"
)

type renderPackageMutation struct {
renderer fn.Renderer
runtime fn.FunctionRuntime
runtime fn.FunctionRuntime
runnerOptions fnruntime.RunnerOptions
}

var _ mutation = &renderPackageMutation{}
Expand Down Expand Up @@ -62,7 +64,8 @@ func (m *renderPackageMutation) Apply(ctx context.Context, resources repository.
// TODO: we should handle this better
klog.Warningf("skipping render as no package was found")
} else {
result, err := m.renderer.Render(ctx, fs, fn.RenderOptions{
renderer := kpt.NewRenderer(m.runnerOptions)
result, err := renderer.Render(ctx, fs, fn.RenderOptions{
PkgPath: pkgPath,
Runtime: m.runtime,
})
Expand Down
4 changes: 2 additions & 2 deletions porch/pkg/engine/render_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func TestRender(t *testing.T) {
runnerOptions.InitDefaults()

render := &renderPackageMutation{
renderer: kpt.NewRenderer(runnerOptions),
runtime: kpt.NewSimpleFunctionRuntime(),
runnerOptions: runnerOptions,
runtime: kpt.NewSimpleFunctionRuntime(),
}

testdata, err := filepath.Abs(filepath.Join(".", "testdata", "simple-render"))
Expand Down

0 comments on commit 17132d4

Please sign in to comment.