diff --git a/porch/config/deploy/2-function-runner.yaml b/porch/config/deploy/2-function-runner.yaml index 52e64b690e..10612df835 100644 --- a/porch/config/deploy/2-function-runner.yaml +++ b/porch/config/deploy/2-function-runner.yaml @@ -45,10 +45,24 @@ spec: - /server --config=/config.yaml --functions=/functions --pod-namespace=porch-fn-system --wrapper-server-image=gcr.io/example-google-project-id/porch-wrapper-server:latest ports: - containerPort: 9445 + # Add grpc readiness probe to ensure the cache is ready + readinessProbe: + exec: + command: + - /grpc-health-probe + - -addr + - localhost:9445 resources: requests: memory: "64Mi" cpu: "125m" + volumeMounts: + - mountPath: /pod-cache-config + name: pod-cache-config-volume + volumes: + - name: pod-cache-config-volume + configMap: + name: pod-cache-config --- apiVersion: v1 @@ -63,3 +77,31 @@ spec: - port: 9445 protocol: TCP targetPort: 9445 + +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: pod-cache-config + namespace: porch-system +data: + pod-cache-config.yaml: | + gcr.io/kpt-fn/apply-replacements:v0.1.0: 30m + gcr.io/kpt-fn/apply-setters:v0.2.0: 30m + gcr.io/kpt-fn/create-setters:v0.1.0: 30m + gcr.io/kpt-fn/ensure-name-substring:v0.2.0: 30m + gcr.io/kpt-fn/gatekeeper:v0.2.1: 30m + gcr.io/kpt-fn/kubeval:v0.2.0: 30m + gcr.io/kpt-fn/search-replace:v0.2.0: 30m + gcr.io/kpt-fn/set-annotations:v0.1.4: 30m + gcr.io/kpt-fn/set-enforcement-action:v0.1.0: 30m + gcr.io/kpt-fn/set-image:v0.1.0: 30m + gcr.io/kpt-fn/set-labels:v0.1.5: 30m + gcr.io/kpt-fn/set-namespace:v0.3.4: 30m + gcr.io/kpt-fn/starlark:v0.4.2: 30m + gcr.io/kpt-fn/upsert-resource:v0.2.0: 30m + gcr.io/kpt-fn/enable-gcp-services:v0.1.0: 30m + gcr.io/kpt-fn/export-terraform:v0.1.0: 30m + gcr.io/kpt-fn/generate-folders:v0.1.1: 30m + gcr.io/kpt-fn/remove-local-config-resources:v0.1.0: 30m + gcr.io/kpt-fn/set-project-id:v0.2.0: 30m diff --git a/porch/func/Dockerfile b/porch/func/Dockerfile index 0097eaf348..d8a3ea520a 100644 --- a/porch/func/Dockerfile +++ b/porch/func/Dockerfile @@ -27,6 +27,9 @@ FROM gcr.io/kpt-fn/upsert-resource:v0.2.0 as upsert-resource FROM golang:1.17.6-alpine3.15 as builder WORKDIR /go/src/github.com/GoogleContainerTools/kpt +RUN go install github.com/grpc-ecosystem/grpc-health-probe@v0.4.11 +RUN cp $GOPATH/bin/grpc-health-probe /grpc-health-probe + COPY go.mod go.sum ./ COPY porch/go.mod porch/go.sum porch/ COPY porch/api/go.mod porch/api/go.sum porch/api/ @@ -52,7 +55,7 @@ COPY --from=set-namespace /usr/local/bin/function /functions/set-namesp COPY --from=set-project-id /usr/local/bin/function /functions/set-project-id COPY --from=starlark /usr/local/bin/star /functions/starlark COPY --from=upsert-resource /usr/local/bin/function /functions/upsert-resource -COPY --from=builder /server /config.yaml / +COPY --from=builder /server /grpc-health-probe /config.yaml / EXPOSE 9445/tcp ENTRYPOINT [ "/server", "--config=/config.yaml", "--functions=/functions" ] diff --git a/porch/func/Dockerfile-wrapperserver b/porch/func/Dockerfile-wrapperserver index 2a36f4457b..0c9d212a57 100644 --- a/porch/func/Dockerfile-wrapperserver +++ b/porch/func/Dockerfile-wrapperserver @@ -6,7 +6,7 @@ COPY go.mod go.sum ./ COPY porch/go.mod porch/go.sum porch/ COPY porch/api/go.mod porch/api/go.sum porch/api/ -RUN go install github.com/grpc-ecosystem/grpc-health-probe@v0.4.8 +RUN go install github.com/grpc-ecosystem/grpc-health-probe@v0.4.11 COPY porch/func/ porch/func/ RUN cd porch/func ; go build -v -o /wrapper-server/wrapper-server ./wrapper-server RUN cp $GOPATH/bin/grpc-health-probe /wrapper-server/ diff --git a/porch/func/healthchecker/healthchecker.go b/porch/func/healthchecker/healthchecker.go new file mode 100644 index 0000000000..4ee0fe5f51 --- /dev/null +++ b/porch/func/healthchecker/healthchecker.go @@ -0,0 +1,42 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package healthchecker + +import ( + "context" + + "google.golang.org/grpc/health/grpc_health_v1" + "k8s.io/klog/v2" +) + +type HealthChecker struct{} + +func NewHealthChecker() *HealthChecker { + return &HealthChecker{} +} + +func (s *HealthChecker) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + klog.Info("Serving the Check request for health check") + return &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }, nil +} + +func (s *HealthChecker) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_health_v1.Health_WatchServer) error { + klog.Info("Serving the Watch request for health check") + return server.Send(&grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) +} diff --git a/porch/func/internal/podevaluator.go b/porch/func/internal/podevaluator.go index d4a40d19a1..22f5f1d8e6 100644 --- a/porch/func/internal/podevaluator.go +++ b/porch/func/internal/podevaluator.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "net" + "os" "path" "strconv" "strings" @@ -30,6 +31,7 @@ import ( "github.com/google/go-containerregistry/pkg/v1/remote" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "gopkg.in/yaml.v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -47,7 +49,8 @@ const ( wrapperServerBin = "wrapper-server" gRPCProbeBin = "grpc-health-probe" krmFunctionLabel = "fn.kpt.dev/image" - lastUseTimeAnnotation = "fn.kpt.dev/last-use" + reclaimAfterAnnotation = "fn.kpt.dev/reclaim-after" + fieldManagerName = "krm-function-runner" channelBufferSize = 128 ) @@ -60,7 +63,7 @@ type podEvaluator struct { var _ Evaluator = &podEvaluator{} -func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Duration) (Evaluator, error) { +func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Duration, podTTLConfig string) (Evaluator, error) { restCfg, err := config.GetConfig() if err != nil { return nil, fmt.Errorf("failed to get rest config: %w", err) @@ -92,14 +95,20 @@ func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Du }, } go pe.podCacheManager.podCacheManager() + + // TODO(mengqiy): add watcher that support reloading the cache when the config file was changed. + err = pe.podCacheManager.warmupCache(podTTLConfig) + // If we can't warm up the cache, we can still proceed without it. + if err != nil { + klog.Warningf("unable to warm up the pod cache: %w", err) + } return pe, nil } func (pe *podEvaluator) EvaluateFunction(ctx context.Context, req *evaluator.EvaluateFunctionRequest) (*evaluator.EvaluateFunctionResponse, error) { starttime := time.Now() defer func() { - endtime := time.Now() - klog.Infof("evaluating %v in pod takes %v", req.Image, endtime.Sub(starttime)) + klog.Infof("evaluating %v in pod took %v", req.Image, time.Now().Sub(starttime)) }() // make a buffer for the channel to prevent unnecessary blocking when the pod cache manager sends it to multiple waiting gorouthine in batch. ccChan := make(chan *clientConnAndError, 1) @@ -163,6 +172,45 @@ type imagePodAndGRPCClient struct { err error } +func (pcm *podCacheManager) warmupCache(podTTLConfig string) error { + start := time.Now() + defer func() { + klog.Infof("cache warning is completed and it took %v", time.Now().Sub(start)) + }() + content, err := os.ReadFile(podTTLConfig) + if err != nil { + return err + } + var podsTTL map[string]string + err = yaml.Unmarshal(content, &podsTTL) + if err != nil { + return err + } + + var wg sync.WaitGroup + for fnImage, ttlStr := range podsTTL { + wg.Add(1) + go func(img, ttlSt string) { + klog.Infof("preloading pod cache for function %v with TTL %v", img, ttlSt) + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + ttl, err := time.ParseDuration(ttlSt) + if err != nil { + klog.Warningf("unable to parse duration from the config file for function %v: %w", fnImage, err) + ttl = pcm.podTTL + } + // We invoke the function with useGenerateName=false so that the pod name is fixed, + // since we want to ensure only one pod is created for each function. + pcm.podManager.getFuncEvalPodClient(ctx, img, ttl, false) + klog.Infof("preloaded pod cache for function %v", img) + }(fnImage, ttlStr) + } + // Wait for the cache warming to finish before returning. + wg.Wait() + return nil +} + func (pcm *podCacheManager) podCacheManager() { tick := time.Tick(pcm.gcScanInternal) for { @@ -178,10 +226,10 @@ func (pcm *podCacheManager) podCacheManager() { err := pcm.podManager.kubeClient.Get(context.Background(), podAndCl.pod, pod) deleteCacheEntry := false if err == nil { - if pod.DeletionTimestamp == nil { + if pod.DeletionTimestamp == nil && net.JoinHostPort(pod.Status.PodIP, defaultWrapperServerPort) == podAndCl.grpcClient.Target() { klog.Infof("reusing the connection to pod %v/%v to evaluate %v", pod.Namespace, pod.Name, req.image) req.grpcClientCh <- &clientConnAndError{grpcClient: podAndCl.grpcClient} - go patchPodWithUnixTimeAnnotation(pcm.podManager.kubeClient, podAndCl.pod) + go patchPodWithUnixTimeAnnotation(pcm.podManager.kubeClient, podAndCl.pod, pcm.podTTL) break } else { deleteCacheEntry = true @@ -200,10 +248,13 @@ func (pcm *podCacheManager) podCacheManager() { } list := pcm.waitlists[req.image] pcm.waitlists[req.image] = append(list, req.grpcClientCh) - go pcm.podManager.getFuncEvalPodClient(context.Background(), req.image) + // We invoke the function with useGenerateName=true to avoid potential name collision, since if pod foo is + // being deleted and we can't use the same name. + go pcm.podManager.getFuncEvalPodClient(context.Background(), req.image, pcm.podTTL, true) case resp := <-pcm.podReadyCh: if resp.err != nil { klog.Warningf("received error from the pod manager: %v", resp.err) + } else { pcm.cache[resp.image] = resp.podAndGRPCClient } channels := pcm.waitlists[resp.image] @@ -237,22 +288,23 @@ func (pcm *podCacheManager) garbageCollector() { if pod.DeletionTimestamp != nil { continue } - lastUse, found := pod.Annotations[lastUseTimeAnnotation] + reclaimAfterStr, found := pod.Annotations[reclaimAfterAnnotation] // If a pod doesn't have a last-use annotation, we patch it. This should not happen, but if it happens, // we give another TTL before deleting it. if !found { - go patchPodWithUnixTimeAnnotation(pcm.podManager.kubeClient, client.ObjectKeyFromObject(&pod)) + go patchPodWithUnixTimeAnnotation(pcm.podManager.kubeClient, client.ObjectKeyFromObject(&pod), pcm.podTTL) continue } else { - lu, err := strconv.ParseInt(lastUse, 10, 64) + reclaimAfter, err := strconv.ParseInt(reclaimAfterStr, 10, 64) // If the annotation is ill-formatted, we patch it with the current time and will try to GC it later. // This should not happen, but if it happens, we give another TTL before deleting it. if err != nil { klog.Warningf("unable to convert the Unix time string to int64: %w", err) - go patchPodWithUnixTimeAnnotation(pcm.podManager.kubeClient, client.ObjectKeyFromObject(&pod)) + go patchPodWithUnixTimeAnnotation(pcm.podManager.kubeClient, client.ObjectKeyFromObject(&pod), pcm.podTTL) continue } - if time.Unix(lu, 0).Add(pcm.podTTL).Before(time.Now()) { + // If the current time is after the reclaim-ater annotation in the pod, we delete the pod and remove the corresponding cache entry. + if time.Now().After(time.Unix(reclaimAfter, 0)) { podIP := pod.Status.PodIP go func(po corev1.Pod) { klog.Infof("deleting pod %v/%v", po.Namespace, po.Name) @@ -275,7 +327,6 @@ func (pcm *podCacheManager) garbageCollector() { // or we can't split the host and port in the client target. delete(pcm.cache, image) } - } } } @@ -292,15 +343,22 @@ type podManager struct { // podReadyCh is a channel to receive requests to get GRPC client from each function evaluation request handler. podReadyCh chan<- *imagePodAndGRPCClient - // entrypointCache is a cache of image name to entrypoint. + // imageMetadataCache is a cache of image name to digestAndEntrypoint. // Only podManager is allowed to touch this cache. - // Its underlying type is map[string][]string. - entrypointCache sync.Map + // Its underlying type is map[string]*digestAndEntrypoint. + imageMetadataCache sync.Map +} + +type digestAndEntrypoint struct { + // digest is a hex string + digest string + // entrypoint is the entrypoint of the image + entrypoint []string } -func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string) { +func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string, ttl time.Duration, useGenerateName bool) { c, err := func() (*podAndGRPCClient, error) { - podKey, err := pm.retrieveOrCreatePod(ctx, image) + podKey, err := pm.retrieveOrCreatePod(ctx, image, ttl, useGenerateName) if err != nil { return nil, err } @@ -329,7 +387,11 @@ func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string) { } // imageEntrypoint get the entrypoint of a container image by looking at its metadata. -func (pm *podManager) imageEntrypoint(image string) ([]string, error) { +func (pm *podManager) imageDigestAndEntrypoint(image string) (*digestAndEntrypoint, error) { + start := time.Now() + defer func() { + klog.Infof("getting image metadata for %v took %v", image, time.Now().Sub(start)) + }() var entrypoint []string ref, err := name.ParseReference(image) if err != nil { @@ -339,6 +401,10 @@ func (pm *podManager) imageEntrypoint(image string) ([]string, error) { if err != nil { return nil, err } + hash, err := img.Digest() + if err != nil { + return nil, err + } cf, err := img.ConfigFile() if err != nil { return nil, err @@ -351,17 +417,41 @@ func (pm *podManager) imageEntrypoint(image string) ([]string, error) { } else { entrypoint = cfg.Cmd } - pm.entrypointCache.Store(image, entrypoint) - return entrypoint, nil + de := &digestAndEntrypoint{ + digest: hash.Hex, + entrypoint: entrypoint, + } + pm.imageMetadataCache.Store(image, de) + return de, nil } -func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string) (client.ObjectKey, error) { +func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string, ttl time.Duration, useGenerateName bool) (client.ObjectKey, error) { + var de *digestAndEntrypoint + var err error + val, found := pm.imageMetadataCache.Load(image) + if !found { + de, err = pm.imageDigestAndEntrypoint(image) + if err != nil { + return client.ObjectKey{}, fmt.Errorf("unable to get the entrypoint for %v: %w", image, err) + } + } else { + de = val.(*digestAndEntrypoint) + } + + podId, err := podID(image, de.digest) + if err != nil { + return client.ObjectKey{}, err + } + // Try to retrieve the pod. Lookup the pod by label to see if there is a pod that can be reused. // Looking it up locally may not work if there are more than one instance of the function runner, // since the pod may be created by one the other instance and the current instance is not aware of it. // TODO: It's possible to set up a Watch in the fn runner namespace, and always try to maintain a up-to-date local cache. podList := &corev1.PodList{} - err := pm.kubeClient.List(ctx, podList, client.InNamespace(pm.namespace), client.MatchingLabels(map[string]string{krmFunctionLabel: transformImageName(image)})) + err = pm.kubeClient.List(ctx, podList, client.InNamespace(pm.namespace), client.MatchingLabels(map[string]string{krmFunctionLabel: podId})) + if err != nil { + klog.Warningf("error when listing pods for %q: %w", image, err) + } if err == nil && len(podList.Items) > 0 { // TODO: maybe we should randomly pick one that is no being deleted. for _, pod := range podList.Items { @@ -372,21 +462,10 @@ func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string) (cl } } - var entrypoint []string - val, found := pm.entrypointCache.Load(image) - if !found { - entrypoint, err = pm.imageEntrypoint(image) - if err != nil { - return client.ObjectKey{}, fmt.Errorf("unable to get the entrypoint for %v: %w", image, err) - } - } else { - entrypoint = val.([]string) - } - cmd := append([]string{ path.Join(volumeMountPath, wrapperServerBin), "--port", defaultWrapperServerPort, "--", - }, entrypoint...) + }, de.entrypoint...) // Create a pod pod := &corev1.Pod{ @@ -395,15 +474,15 @@ func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string) (cl Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - Namespace: pm.namespace, - GenerateName: "krm-fn-", + Namespace: pm.namespace, Annotations: map[string]string{ - lastUseTimeAnnotation: fmt.Sprintf("%v", time.Now().Unix()), + reclaimAfterAnnotation: fmt.Sprintf("%v", time.Now().Add(ttl).Unix()), }, - // The function runner can use the label to retrieve the pod + // The function runner can use the label to retrieve the pod. Label is function name + part of its digest. + // If a function has more than one tags pointing to the same digest, we can reuse the same pod. // TODO: controller-runtime provides field indexer, we can potentially use it to index spec.containers[*].image field. Labels: map[string]string{ - krmFunctionLabel: transformImageName(image), + krmFunctionLabel: podId, }, }, Spec: corev1.PodSpec{ @@ -459,10 +538,20 @@ func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string) (cl }, }, } - err = pm.kubeClient.Create(ctx, pod) - if err != nil { - return client.ObjectKey{}, err + if useGenerateName { + pod.GenerateName = podId + "-" + err = pm.kubeClient.Create(ctx, pod, client.FieldOwner(fieldManagerName)) + if err != nil { + return client.ObjectKey{}, fmt.Errorf("unable to apply the pod: %w", err) + } + } else { + pod.Name = podId + err = pm.kubeClient.Patch(ctx, pod, client.Apply, client.FieldOwner(fieldManagerName)) + if err != nil { + return client.ObjectKey{}, fmt.Errorf("unable to apply the pod: %w", err) + } } + klog.Infof("created KRM function evaluator pod %v/%v for %q", pod.Namespace, pod.Name, image) return client.ObjectKeyFromObject(pod), nil } @@ -491,8 +580,8 @@ func (pm *podManager) podIpIfRunningAndReady(ctx context.Context, podKey client. return pod.Status.PodIP, nil } -func patchPodWithUnixTimeAnnotation(cl client.Client, podKey client.ObjectKey) { - patch := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%v": "%v"}}}`, lastUseTimeAnnotation, time.Now().Unix())) +func patchPodWithUnixTimeAnnotation(cl client.Client, podKey client.ObjectKey, ttl time.Duration) { + patch := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%v": "%v"}}}`, reclaimAfterAnnotation, time.Now().Add(ttl).Unix())) pod := &corev1.Pod{} pod.Namespace = podKey.Namespace pod.Name = podKey.Name @@ -501,6 +590,15 @@ func patchPodWithUnixTimeAnnotation(cl client.Client, podKey client.ObjectKey) { } } -func transformImageName(image string) string { - return strings.ReplaceAll(strings.ReplaceAll(image, "/", "__"), ":", "___") +func podID(image, hash string) (string, error) { + ref, err := name.ParseReference(image) + if err != nil { + return "", fmt.Errorf("unable to parse image reference %v: %w", image, err) + } + + // repoName will be something like gcr.io/kpt-fn/set-namespace + repoName := ref.Context().Name() + parts := strings.Split(repoName, "/") + name := strings.ReplaceAll(parts[len(parts)-1], "_", "-") + return fmt.Sprintf("%v-%v", name, hash[:8]), nil } diff --git a/porch/func/server/server.go b/porch/func/server/server.go index cf70627051..ccc1904775 100644 --- a/porch/func/server/server.go +++ b/porch/func/server/server.go @@ -23,8 +23,10 @@ import ( "time" pb "github.com/GoogleContainerTools/kpt/porch/func/evaluator" + "github.com/GoogleContainerTools/kpt/porch/func/healthchecker" "github.com/GoogleContainerTools/kpt/porch/func/internal" "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" "k8s.io/klog/v2" ) @@ -37,8 +39,9 @@ var ( port = flag.Int("port", 9445, "The server port") functions = flag.String("functions", "./functions", "Path to cached functions.") config = flag.String("config", "./config.yaml", "Path to the config file.") + podCacheConfig = flag.String("pod-cache-config", "/pod-cache-config/pod-cache-config.yaml", "Path to the pod cache config file. The file is map of function name to TTL.") podNamespace = flag.String("pod-namespace", "porch-fn-system", "Namespace to run KRM functions pods.") - podTTL = flag.Duration("pod-ttl", 10*time.Minute, "TTL for pods before GC.") + podTTL = flag.Duration("pod-ttl", 30*time.Minute, "TTL for pods before GC.") scanInterval = flag.Duration("scan-interval", time.Minute, "The interval of GC between scans.") wrapperServerImage = flag.String("wrapper-server-image", "", "Image name of the wrapper server.") disableRuntimes = flag.String("disable-runtimes", "", fmt.Sprintf("The runtime(s) to disable. Multiple runtimes should separated by `,`. Available runtimes: `%v`, `%v`.", execRuntime, podRuntime)) @@ -80,7 +83,7 @@ func run() error { } runtimes = append(runtimes, execEval) case podRuntime: - podEval, err := internal.NewPodEvaluator(*podNamespace, *wrapperServerImage, *scanInterval, *podTTL) + podEval, err := internal.NewPodEvaluator(*podNamespace, *wrapperServerImage, *scanInterval, *podTTL, *podCacheConfig) if err != nil { return fmt.Errorf("failed to initialize pod evaluator: %w", err) } @@ -97,6 +100,8 @@ func run() error { // Start the gRPC server server := grpc.NewServer() pb.RegisterFunctionEvaluatorServer(server, evaluator) + healthService := healthchecker.NewHealthChecker() + grpc_health_v1.RegisterHealthServer(server, healthService) if err := server.Serve(lis); err != nil { return fmt.Errorf("server failed: %w", err) } diff --git a/porch/func/wrapper-server/main.go b/porch/func/wrapper-server/main.go index 647a9c9740..b94667364d 100644 --- a/porch/func/wrapper-server/main.go +++ b/porch/func/wrapper-server/main.go @@ -24,6 +24,7 @@ import ( "os/exec" pb "github.com/GoogleContainerTools/kpt/porch/func/evaluator" + "github.com/GoogleContainerTools/kpt/porch/func/healthchecker" "github.com/spf13/cobra" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -73,7 +74,7 @@ func (o *options) run() error { // Start the gRPC server server := grpc.NewServer() pb.RegisterFunctionEvaluatorServer(server, evaluator) - healthService := NewHealthChecker() + healthService := healthchecker.NewHealthChecker() grpc_health_v1.RegisterHealthServer(server, healthService) if err := server.Serve(lis); err != nil { @@ -109,23 +110,3 @@ func (e *singleFunctionEvaluator) EvaluateFunction(ctx context.Context, req *pb. Log: stderr.Bytes(), }, nil } - -type HealthChecker struct{} - -func NewHealthChecker() *HealthChecker { - return &HealthChecker{} -} - -func (s *HealthChecker) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { - klog.Info("Serving the Check request for health check") - return &grpc_health_v1.HealthCheckResponse{ - Status: grpc_health_v1.HealthCheckResponse_SERVING, - }, nil -} - -func (s *HealthChecker) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_health_v1.Health_WatchServer) error { - klog.Info("Serving the Watch request for health check") - return server.Send(&grpc_health_v1.HealthCheckResponse{ - Status: grpc_health_v1.HealthCheckResponse_SERVING, - }) -}