From 45ce8768cfc59b39d360b6a98e043261f1486cbd Mon Sep 17 00:00:00 2001 From: Frederic Wilhelm Date: Tue, 4 Feb 2025 18:49:29 +0100 Subject: [PATCH] wip --- Makefile | 2 +- api/v1alpha1/condition_types.go | 36 +++ api/v1alpha1/constants.go | 2 +- cmd/main.go | 3 +- .../component/component_controller.go | 44 +-- .../component/component_controller_test.go | 99 +++++-- internal/controller/component/suite_test.go | 58 +++- .../resource/resource_controller.go | 264 ++++++------------ .../resource/resource_controller_test.go | 78 ++---- internal/controller/resource/suite_test.go | 65 +++-- .../controller/snapshot/controller_test.go | 14 +- pkg/mocks/snapshot.go | 5 +- pkg/ocm/artifact.go | 42 +-- pkg/snapshot/repository.go | 39 ++- pkg/snapshot/snapshot.go | 30 ++ 15 files changed, 438 insertions(+), 343 deletions(-) diff --git a/Makefile b/Makefile index b5fdd792..3deba5f3 100644 --- a/Makefile +++ b/Makefile @@ -189,7 +189,7 @@ $(ENVTEST): $(LOCALBIN) .PHONY: zot-registry zot-registry: $(LOCALBIN) # Download zot registry binary locally if necessary. - @wget "https://github.com/project-zot/zot/releases/download/$(ZOT_VERSION)/zot-$(OS)-$(ARCH)-minimal" \ + wget "https://github.com/project-zot/zot/releases/download/$(ZOT_VERSION)/zot-$(OS)-$(ARCH)-minimal" \ -O $(LOCALBIN)/zot-registry \ && chmod u+x $(LOCALBIN)/zot-registry diff --git a/api/v1alpha1/condition_types.go b/api/v1alpha1/condition_types.go index d7f0f2d3..fbd031f5 100644 --- a/api/v1alpha1/condition_types.go +++ b/api/v1alpha1/condition_types.go @@ -63,18 +63,54 @@ const ( // ReconcileArtifactFailedReason is used when we fail in creating an Artifact. ReconcileArtifactFailedReason = "ReconcileArtifactFailed" + // MarshalFailedReason is used when we fail to marshal a struct. + MarshalFailedReason = "MarshalFailed" + + // CreateOCIRepositoryNameFailedReason is used when we fail to create an OCI repository name. + CreateOCIRepositoryNameFailedReason = "CreateOCIRepositoryNameFailed" + + // CreateOCIRepositoryFailedReason is used when we fail to create a OCI repository. + CreateOCIRepositoryFailedReason = "CreateOCIRepositoryFailed" + + // CreateSnapshotFailedReason is used when we fail to create a snapshot. + CreateSnapshotFailedReason = "CreateSnapshotFailed" + // GetArtifactFailedReason is used when we fail in getting an Artifact. GetArtifactFailedReason = "GetArtifactFailed" + // GetSnapshotFailedReason is used when we fail in getting a Snapshot. + GetSnapshotFailedReason = "GetSnapshotFailed" + // ResolveResourceFailedReason is used when we fail in resolving a resource. ResolveResourceFailedReason = "ResolveResourceFailed" // GetResourceAccessFailedReason is used when we fail in getting a resource access(es). GetResourceAccessFailedReason = "GetResourceAccessFailed" + // GetBlobAccessFailedReason is used when we fail to get a blob access. + GetBlobAccessFailedReason = "GetBlobAccessFailed" + + // VerifyResourceFailedReason is used when we fail to verify a resource. + VerifyResourceFailedReason = "VerifyResourceFailed" + + // GetResourceFailedReason is used when we fail to get the resource. + GetResourceFailedReason = "GetResourceFailed" + + // PushSnapshotFailedReason is used when we fail to push a snapshot. + PushSnapshotFailedReason = "PushSnapshotFailed" + + // FetchSnapshotFailedReason is used when we fail to fetch a snapshot. + FetchSnapshotFailedReason = "FetchSnapshotFailed" + + // DeleteSnapshotFailedReason is used when we fail to delete a snapshot. + DeleteSnapshotFailedReason = "DeleteSnapshotFailed" + // GetComponentForArtifactFailedReason is used when we fail in getting a component for an artifact. GetComponentForArtifactFailedReason = "GetComponentForArtifactFailed" + // GetComponentForSnapshotFailedReason is used when we fail in getting a component for a snapshot. + GetComponentForSnapshotFailedReason = "GetComponentForSnapshotFailed" + // StatusSetFailedReason is used when we fail to set the component status. StatusSetFailedReason = "StatusSetFailed" diff --git a/api/v1alpha1/constants.go b/api/v1alpha1/constants.go index 889f6e23..a6240dc6 100644 --- a/api/v1alpha1/constants.go +++ b/api/v1alpha1/constants.go @@ -37,7 +37,7 @@ const ( // Finalizers for controllers. const ( - // TODO: Remove ArtifactFinalizer + // TODO: Remove ArtifactFinalizer. // ArtifactFinalizer is the finalizer that is added to artifacts created by the ocm controllers. ArtifactFinalizer = "finalizers.ocm.software/artifact" diff --git a/cmd/main.go b/cmd/main.go index 4da4e577..2f3c60bf 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -211,7 +211,8 @@ func main() { Scheme: mgr.GetScheme(), EventRecorder: eventsRecorder, }, - Storage: storage, + Registry: registry, + Storage: storage, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Resource") os.Exit(1) diff --git a/internal/controller/component/component_controller.go b/internal/controller/component/component_controller.go index 618e16e8..ce8c0ff2 100644 --- a/internal/controller/component/component_controller.go +++ b/internal/controller/component/component_controller.go @@ -22,7 +22,6 @@ import ( "fmt" "github.com/Masterminds/semver/v3" - "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/conditions" "github.com/fluxcd/pkg/runtime/patch" "github.com/mandelsoft/goutils/sliceutils" @@ -59,22 +58,24 @@ var _ ocm.Reconciler = (*Reconciler)(nil) // SetupWithManager sets up the controller with the Manager. func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). + // TODO: Check if we should watch for the snapshots that are created by this controller For(&v1alpha1.Component{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Complete(r) } // +kubebuilder:rbac:groups=delivery.ocm.software,resources=components,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=delivery.ocm.software,resources=components/status,verbs=get;update;patch -// +kubebuilder:rbac:groups=delivery.ocm.software,resources=components/finalizers,verbs=update - -// +kubebuilder:rbac:groups=openfluxcd.ocm.software,resources=artifacts,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=openfluxcd.ocm.software,resources=artifacts/status,verbs=get;update;patch -// +kubebuilder:rbac:groups=openfluxcd.ocm.software,resources=artifacts/finalizers,verbs=update +// +kubebuilder:rbac:groups=delivery.ocm.software,resources=components/finalizers,verbs=updat // +kubebuilder:rbac:groups="",resources=secrets;configmaps;serviceaccounts,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=serviceaccounts/token,verbs=create // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch +// TODO: Remove +// +kubebuilder:rbac:groups=openfluxcd.ocm.software,resources=artifacts,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=openfluxcd.ocm.software,resources=artifacts/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=openfluxcd.ocm.software,resources=artifacts/finalizers,verbs=update + // Reconcile the component object. func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, retErr error) { component := &v1alpha1.Component{} @@ -141,8 +142,8 @@ func (r *Reconciler) reconcile(ctx context.Context, component *v1alpha1.Componen // not ready as well. // However, as the component is hard-dependant on the ocmrepository, we decided to mark it not ready as well. if !conditions.IsReady(repo) { - conditions.Delete(component, meta.ReconcilingCondition) - conditions.MarkFalse(component, meta.ReadyCondition, v1alpha1.RepositoryIsNotReadyReason, "repository is not ready") + logger.Info("repository is not ready", "name", component.Spec.RepositoryRef.Name) + status.MarkNotReady(r.EventRecorder, component, v1alpha1.RepositoryIsNotReadyReason, "repository is not ready yet") return ctrl.Result{Requeue: true}, nil } @@ -246,36 +247,37 @@ func (r *Reconciler) reconcileComponent(ctx context.Context, octx ocmctx.Context // TODO: Can I check beforehand if the CD is already downloaded and in the OCI Registry (cached)? // Compare digest/hash from manifest of the CD from the source storage + logger.Info("pushing descriptors to storage") ociRepositoryName, err := snapshot.CreateRepositoryName(component.Spec.RepositoryRef.Name, component.GetName()) if err != nil { - status.MarkNotReady(r.EventRecorder, component, v1alpha1.ReconcileArtifactFailedReason, err.Error()) + status.MarkNotReady(r.EventRecorder, component, v1alpha1.CreateOCIRepositoryNameFailedReason, err.Error()) return ctrl.Result{}, err } ociRepository, err := r.Registry.NewRepository(ctx, ociRepositoryName) if err != nil { - status.MarkNotReady(r.EventRecorder, component, v1alpha1.ReconcileArtifactFailedReason, err.Error()) + status.MarkNotReady(r.EventRecorder, component, v1alpha1.CreateOCIRepositoryFailedReason, err.Error()) return ctrl.Result{}, err } - descriptorBytes, err := yaml.Marshal(descriptors) + descriptorsBytes, err := yaml.Marshal(descriptors) if err != nil { - status.MarkNotReady(r.EventRecorder, component, v1alpha1.ReconcileArtifactFailedReason, err.Error()) + status.MarkNotReady(r.EventRecorder, component, v1alpha1.MarshalFailedReason, err.Error()) return ctrl.Result{}, err } - manifestDigest, err := ociRepository.PushSnapshot(ctx, version, descriptorBytes) + manifestDigest, err := ociRepository.PushSnapshot(ctx, version, descriptorsBytes) if err != nil { status.MarkNotReady(r.EventRecorder, component, v1alpha1.ReconcileArtifactFailedReason, err.Error()) return ctrl.Result{}, err } - // Create snapshot - snapshotCR := snapshot.Create(component, ociRepositoryName, manifestDigest.String(), version, digest.FromBytes(descriptorBytes).String(), int64(len(descriptorBytes))) + logger.Info("creating snapshot") + snapshotCR := snapshot.Create(component, ociRepositoryName, manifestDigest.String(), version, digest.FromBytes(descriptorsBytes).String(), int64(len(descriptorsBytes))) if _, err = controllerutil.CreateOrUpdate(ctx, r.GetClient(), &snapshotCR, func() error { if snapshotCR.ObjectMeta.CreationTimestamp.IsZero() { @@ -284,24 +286,24 @@ func (r *Reconciler) reconcileComponent(ctx context.Context, octx ocmctx.Context } } + component.Status.SnapshotRef = corev1.LocalObjectReference{ + Name: snapshotCR.GetName(), + } + return nil }); err != nil { - status.MarkNotReady(r.EventRecorder, component, v1alpha1.ReconcileArtifactFailedReason, err.Error()) + status.MarkNotReady(r.EventRecorder, component, v1alpha1.CreateSnapshotFailedReason, err.Error()) return ctrl.Result{}, err } - // Update component status + logger.Info("updating status") component.Status.Component = v1alpha1.ComponentInfo{ RepositorySpec: repository.Spec.RepositorySpec, Component: component.Spec.Component, Version: version, } - component.Status.SnapshotRef = corev1.LocalObjectReference{ - Name: snapshotCR.GetName(), - } - component.Status.EffectiveOCMConfig = configs status.MarkReady(r.EventRecorder, component, "Applied version %s", version) diff --git a/internal/controller/component/component_controller_test.go b/internal/controller/component/component_controller_test.go index 562de82f..3b5bda50 100644 --- a/internal/controller/component/component_controller_test.go +++ b/internal/controller/component/component_controller_test.go @@ -19,13 +19,17 @@ package component import ( "context" "fmt" + "io" "os" "time" . "github.com/mandelsoft/goutils/testutils" + "github.com/mandelsoft/vfs/pkg/vfs" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" . "ocm.software/ocm/api/helper/builder" + "ocm.software/ocm/api/utils/accessobj" + "sigs.k8s.io/yaml" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/conditions" @@ -37,9 +41,12 @@ import ( environment "ocm.software/ocm/api/helper/env" "ocm.software/ocm/api/ocm/extensions/repositories/ctf" "ocm.software/ocm/api/utils/accessio" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest/komega" "github.com/open-component-model/ocm-k8s-toolkit/api/v1alpha1" + "github.com/open-component-model/ocm-k8s-toolkit/pkg/ocm" + "github.com/open-component-model/ocm-k8s-toolkit/pkg/snapshot" ) const ( @@ -133,20 +140,30 @@ var _ = Describe("Component Controller", func() { Status: v1alpha1.ComponentStatus{}, } Expect(k8sClient.Create(ctx, component)).To(Succeed()) + DeferCleanup(func(ctx SpecContext) { + Expect(k8sClient.Delete(ctx, component, client.PropagationPolicy(metav1.DeletePropagationForeground))).To(Succeed()) + }) - By("check that snapshot has been created successfully") + By("checking that the component has been reconciled successfully") + Eventually(komega.Object(component), "5m").Should( + HaveField("Status.ObservedGeneration", Equal(int64(1)))) - Eventually(komega.Object(component), "15s").Should( - HaveField("Status.SnapshotRef.Name", Not(BeEmpty()))) + By("checking that the snapshot has been created successfully") + Expect(component).To(HaveField("Status.SnapshotRef.Name", Not(BeEmpty()))) + snapshotComponent := Must(snapshot.GetSnapshotForOwner(ctx, k8sClient, component)) - By("checking if the snapshot can be received") - snapshot := &v1alpha1.Snapshot{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: component.Namespace, - Name: component.Status.SnapshotRef.Name, - }, - } - Eventually(komega.Get(snapshot)).Should(Succeed()) + By("checking that the snapshot contains the correct content") + snapshotRepository := Must(registry.NewRepository(ctx, snapshotComponent.Spec.Repository)) + snapshotComponentContentReader := Must(snapshotRepository.FetchSnapshot(ctx, snapshotComponent.GetDigest())) + snapshotComponentContent := Must(io.ReadAll(snapshotComponentContentReader)) + snapshotDescriptors := &ocm.Descriptors{} + MustBeSuccessful(yaml.Unmarshal(snapshotComponentContent, snapshotDescriptors)) + + repo := Must(ctf.Open(env, accessobj.ACC_WRITABLE, ctfpath, vfs.FileMode(vfs.O_RDWR), env)) + cv := Must(repo.LookupComponentVersion(Component, Version1)) + expectedDescriptors := Must(ocm.ListComponentDescriptors(ctx, cv, repo)) + + Expect(snapshotDescriptors).To(YAMLEqual(expectedDescriptors)) }) It("does not reconcile when the repository is not ready", func() { @@ -172,6 +189,9 @@ var _ = Describe("Component Controller", func() { Status: v1alpha1.ComponentStatus{}, } Expect(k8sClient.Create(ctx, component)).To(Succeed()) + DeferCleanup(func(ctx SpecContext) { + Expect(k8sClient.Delete(ctx, component, client.PropagationPolicy(metav1.DeletePropagationForeground))).To(Succeed()) + }) By("check that no snapshot has been created") Eventually(komega.Object(component), "15s").Should( @@ -197,11 +217,17 @@ var _ = Describe("Component Controller", func() { Status: v1alpha1.ComponentStatus{}, } Expect(k8sClient.Create(ctx, component)).To(Succeed()) + DeferCleanup(func(ctx SpecContext) { + Expect(k8sClient.Delete(ctx, component, client.PropagationPolicy(metav1.DeletePropagationForeground))).To(Succeed()) + }) - By("check that snapshot has been created successfully") + By("checking that the component has been reconciled successfully") + Eventually(komega.Object(component), "5m").Should( + HaveField("Status.ObservedGeneration", Equal(int64(1)))) - Eventually(komega.Object(component), "15s").Should( - HaveField("Status.SnapshotRef.Name", Not(BeEmpty()))) + By("checking that the snapshot has been created successfully") + Expect(component).To(HaveField("Status.SnapshotRef.Name", Not(BeEmpty()))) + Must(snapshot.GetSnapshotForOwner(ctx, k8sClient, component)) Expect(k8sClient.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, component)).To(Succeed()) Expect(component.Status.Component.Version).To(Equal(Version1)) @@ -254,10 +280,17 @@ var _ = Describe("Component Controller", func() { Status: v1alpha1.ComponentStatus{}, } Expect(k8sClient.Create(ctx, component)).To(Succeed()) + DeferCleanup(func(ctx SpecContext) { + Expect(k8sClient.Delete(ctx, component, client.PropagationPolicy(metav1.DeletePropagationForeground))).To(Succeed()) + }) - By("check that snapshot has been created successfully") + By("checking that the component has been reconciled successfully") + Eventually(komega.Object(component), "5m").Should( + HaveField("Status.ObservedGeneration", Equal(int64(1)))) - Eventually(komega.Object(component), "15s").Should(HaveField("Status.SnapshotRef.Name", Not(BeEmpty()))) + By("checking that the snapshot has been created successfully") + Expect(component).To(HaveField("Status.SnapshotRef.Name", Not(BeEmpty()))) + Must(snapshot.GetSnapshotForOwner(ctx, k8sClient, component)) Expect(k8sClient.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, component)).To(Succeed()) Expect(component.Status.Component.Version).To(Equal("0.0.3")) @@ -303,9 +336,17 @@ var _ = Describe("Component Controller", func() { }, } Expect(k8sClient.Create(ctx, component)).To(Succeed()) + DeferCleanup(func(ctx SpecContext) { + Expect(k8sClient.Delete(ctx, component, client.PropagationPolicy(metav1.DeletePropagationForeground))).To(Succeed()) + }) - By("check that snapshot has been created successfully") - Eventually(komega.Object(component), "15s").Should(HaveField("Status.SnapshotRef.Name", Not(BeEmpty()))) + By("checking that the component has been reconciled successfully") + Eventually(komega.Object(component), "5m").Should( + HaveField("Status.ObservedGeneration", Equal(int64(1)))) + + By("checking that the snapshot has been created successfully") + Expect(component).To(HaveField("Status.SnapshotRef.Name", Not(BeEmpty()))) + Must(snapshot.GetSnapshotForOwner(ctx, k8sClient, component)) Expect(k8sClient.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, component)).To(Succeed()) Expect(component.Status.Component.Version).To(Equal("0.0.3")) @@ -349,11 +390,17 @@ var _ = Describe("Component Controller", func() { Status: v1alpha1.ComponentStatus{}, } Expect(k8sClient.Create(ctx, component)).To(Succeed()) + DeferCleanup(func(ctx SpecContext) { + Expect(k8sClient.Delete(ctx, component, client.PropagationPolicy(metav1.DeletePropagationForeground))).To(Succeed()) + }) - By("check that snapshot has been created successfully") + By("checking that the component has been reconciled successfully") + Eventually(komega.Object(component), "5m").Should( + HaveField("Status.ObservedGeneration", Equal(int64(1)))) - Eventually(komega.Object(component), "15s").Should( - HaveField("Status.SnapshotRef.Name", Not(BeEmpty()))) + By("checking that the snapshot has been created successfully") + Expect(component).To(HaveField("Status.SnapshotRef.Name", Not(BeEmpty()))) + Must(snapshot.GetSnapshotForOwner(ctx, k8sClient, component)) Expect(k8sClient.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, component)).To(Succeed()) Expect(component.Status.Component.Version).To(Equal("0.0.3")) @@ -365,7 +412,7 @@ var _ = Describe("Component Controller", func() { Expect(k8sClient.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, component)).To(Succeed()) return component.Status.Component.Version == "0.0.2" - }).WithTimeout(15 * time.Second).Should(BeTrue()) + }).WithTimeout(60 * time.Second).Should(BeTrue()) }) }) @@ -559,6 +606,14 @@ var _ = Describe("Component Controller", func() { } Expect(k8sClient.Create(ctx, component)).To(Succeed()) + By("checking that the component has been reconciled successfully") + Eventually(komega.Object(component), "5m").Should( + HaveField("Status.ObservedGeneration", Equal(int64(1)))) + + By("checking that the snapshot has been created successfully") + Expect(component).To(HaveField("Status.SnapshotRef.Name", Not(BeEmpty()))) + Must(snapshot.GetSnapshotForOwner(ctx, k8sClient, component)) + Eventually(komega.Object(component), "15s").Should( HaveField("Status.EffectiveOCMConfig", ConsistOf( v1alpha1.OCMConfiguration{ diff --git a/internal/controller/component/suite_test.go b/internal/controller/component/suite_test.go index bd8f01d1..96fc491e 100644 --- a/internal/controller/component/suite_test.go +++ b/internal/controller/component/suite_test.go @@ -16,10 +16,15 @@ package component import ( "context" "fmt" + "net/http" + "os" + "os/exec" "path/filepath" "runtime" "testing" + "time" + . "github.com/mandelsoft/goutils/testutils" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" artifactv1 "github.com/openfluxcd/artifact/api/v1alpha1" @@ -37,8 +42,8 @@ import ( metricserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "github.com/open-component-model/ocm-k8s-toolkit/api/v1alpha1" - "github.com/open-component-model/ocm-k8s-toolkit/pkg/mocks" "github.com/open-component-model/ocm-k8s-toolkit/pkg/ocm" + "github.com/open-component-model/ocm-k8s-toolkit/pkg/snapshot" ) // +kubebuilder:scaffold:imports @@ -50,6 +55,9 @@ var cfg *rest.Config var k8sClient client.Client var k8sManager ctrl.Manager var testEnv *envtest.Environment +var zotCmd *exec.Cmd +var registry *snapshot.Registry +var zotRootDir string func TestControllers(t *testing.T) { RegisterFailHandler(Fail) @@ -75,8 +83,10 @@ var _ = BeforeSuite(func() { fmt.Sprintf("1.30.0-%s-%s", runtime.GOOS, runtime.GOARCH)), } + var err error + // cfg is defined in this file globally. - cfg, err := testEnv.Start() + cfg, err = testEnv.Start() Expect(err).NotTo(HaveOccurred()) Expect(cfg).NotTo(BeNil()) DeferCleanup(testEnv.Stop) @@ -100,8 +110,36 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) - mockRegistry, err := mocks.NewRegistry("") - Expect(err).NotTo(HaveOccurred()) + // Create zot-registry config file + zotRootDir = Must(os.MkdirTemp("", "")) + zotAddress := "0.0.0.0" + zotPort := "8080" + zotConfig := []byte(fmt.Sprintf(`{"storage":{"rootDirectory":"%s"},"http":{"address":"%s","port": "%s"}}`, zotRootDir, zotAddress, zotPort)) + zotConfigFile := filepath.Join(zotRootDir, "config.json") + MustBeSuccessful(os.WriteFile(zotConfigFile, zotConfig, 0644)) + + // Start zot-registry + zotCmd = exec.Command(filepath.Join("..", "..", "..", "bin", "zot-registry"), "serve", zotConfigFile) + err = zotCmd.Start() + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to start Zot")) + + // Wait for Zot to be ready + Eventually(func() error { + resp, err := http.Get(fmt.Sprintf("http://%s:%s/v2/", zotAddress, zotPort)) + if err != nil { + return fmt.Errorf("could not connect to Zot") + } + + defer resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + return nil + }, 30*time.Second, 1*time.Second).Should(Succeed(), "Zot registry did not start in time") + + registry, err = snapshot.NewRegistry(fmt.Sprintf("%s:%s", zotAddress, zotPort)) + registry.PlainHTTP = true Expect((&Reconciler{ BaseReconciler: &ocm.BaseReconciler{ @@ -112,7 +150,7 @@ var _ = BeforeSuite(func() { IncludeObject: true, }, }, - Registry: mockRegistry, + Registry: registry, }).SetupWithManager(k8sManager)).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) @@ -130,3 +168,13 @@ var _ = BeforeSuite(func() { Expect(k8sManager.Start(ctx)).To(Succeed()) }() }) + +var _ = AfterSuite(func() { + if zotCmd != nil { + err := zotCmd.Process.Kill() + Expect(err).NotTo(HaveOccurred(), "Failed to stop Zot registry") + + // Clean up root directory + MustBeSuccessful(os.RemoveAll(zotRootDir)) + } +}) diff --git a/internal/controller/resource/resource_controller.go b/internal/controller/resource/resource_controller.go index 81b28216..c93101d4 100644 --- a/internal/controller/resource/resource_controller.go +++ b/internal/controller/resource/resource_controller.go @@ -21,17 +21,14 @@ import ( "encoding/json" "errors" "fmt" - "os" - "path/filepath" - "strings" "github.com/fluxcd/pkg/runtime/conditions" "github.com/fluxcd/pkg/runtime/patch" + "github.com/opencontainers/go-digest" "github.com/openfluxcd/controller-manager/storage" "k8s.io/apimachinery/pkg/types" "ocm.software/ocm/api/datacontext" "ocm.software/ocm/api/ocm/compdesc" - "ocm.software/ocm/api/ocm/extensions/download" "ocm.software/ocm/api/ocm/resolvers" "ocm.software/ocm/api/ocm/selectors" "ocm.software/ocm/api/ocm/tools/signing" @@ -44,23 +41,22 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - artifactv1 "github.com/openfluxcd/artifact/api/v1alpha1" corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" ocmctx "ocm.software/ocm/api/ocm" v1 "ocm.software/ocm/api/ocm/compdesc/meta/v1" ctrl "sigs.k8s.io/controller-runtime" "github.com/open-component-model/ocm-k8s-toolkit/api/v1alpha1" - "github.com/open-component-model/ocm-k8s-toolkit/pkg/compression" "github.com/open-component-model/ocm-k8s-toolkit/pkg/ocm" + snapshotRegistry "github.com/open-component-model/ocm-k8s-toolkit/pkg/snapshot" "github.com/open-component-model/ocm-k8s-toolkit/pkg/status" ) type Reconciler struct { *ocm.BaseReconciler - Storage *storage.Storage + Storage *storage.Storage + Registry snapshotRegistry.RegistryType } var _ ocm.Reconciler = (*Reconciler)(nil) @@ -81,8 +77,8 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.Resource{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). - // Watch for artifacts-events that are owned by the resource controller - Owns(&artifactv1.Artifact{}). + // Watch for snapshot-events that are owned by the resource controller + Owns(&v1alpha1.Snapshot{}). // Watch for component-events that are referenced by resources Watches( &v1alpha1.Component{}, @@ -116,11 +112,6 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { // +kubebuilder:rbac:groups=delivery.ocm.software,resources=resources,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=delivery.ocm.software,resources=resources/status,verbs=get;update;patch -// +kubebuilder:rbac:groups=delivery.ocm.software,resources=resources/finalizers,verbs=update - -// +kubebuilder:rbac:groups=openfluxcd.ocm.software,resources=artifacts,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=openfluxcd.ocm.software,resources=artifacts/status,verbs=get;update;patch -// +kubebuilder:rbac:groups=openfluxcd.ocm.software,resources=artifacts/finalizers,verbs=update func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { resource := &v1alpha1.Resource{} @@ -152,30 +143,12 @@ func (r *Reconciler) reconcileExists(ctx context.Context, resource *v1alpha1.Res return ctrl.Result{}, nil } - if !resource.GetDeletionTimestamp().IsZero() { - // TODO: This is a temporary solution until a artifact-reconciler is written to handle the deletion of artifacts - if err := ocm.RemoveArtifactForCollectable(ctx, r.Client, r.Storage, resource); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to remove artifact: %w", err) - } - - if removed := controllerutil.RemoveFinalizer(resource, v1alpha1.ArtifactFinalizer); removed { - if err := r.Update(ctx, resource); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to remove finalizer: %w", err) - } - } + if resource.GetDeletionTimestamp() != nil { + logger.Info("resource is being deleted and cannot be used", "name", resource.Name) return ctrl.Result{}, nil } - if added := controllerutil.AddFinalizer(resource, v1alpha1.ArtifactFinalizer); added { - err := r.Update(ctx, resource) - if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to add finalizer: %w", err) - } - - return ctrl.Result{Requeue: true}, nil - } - return r.reconcile(ctx, resource) } @@ -223,6 +196,7 @@ func (r *Reconciler) reconcileOCM(ctx context.Context, resource *v1alpha1.Resour return result, nil } +//nolint:funlen // we do not want to cut function at an arbitrary point func (r *Reconciler) reconcileResource(ctx context.Context, octx ocmctx.Context, resource *v1alpha1.Resource, component *v1alpha1.Component) (ctrl.Result, error) { logger := log.FromContext(ctx) logger.V(1).Info("reconciling resource") @@ -244,22 +218,26 @@ func (r *Reconciler) reconcileResource(ctx context.Context, octx ocmctx.Context, return ctrl.Result{}, err } - // Get artifact from component that contains component descriptor - artifactComponent := &artifactv1.Artifact{} - if err := r.Get(ctx, types.NamespacedName{ - // TODO: see https://github.com/open-component-model/ocm-project/issues/295 - Namespace: resource.GetNamespace(), - Name: component.Status.ArtifactRef.Name, - }, artifactComponent); err != nil { - status.MarkNotReady(r.EventRecorder, resource, v1alpha1.GetArtifactFailedReason, "Cannot get component artifact") + // Get snapshot from component that contains component descriptor + componentSnapshot, err := snapshotRegistry.GetSnapshotForOwner(ctx, r.Client, component) + if err != nil { + status.MarkNotReady(r.GetEventRecorder(), resource, v1alpha1.GetSnapshotFailedReason, err.Error()) - return ctrl.Result{}, fmt.Errorf("failed to get component artifact: %w", err) + return ctrl.Result{}, nil + } + + // Create repository from registry for snapshot + repositoryDescriptor, err := r.Registry.NewRepository(ctx, componentSnapshot.Spec.Repository) + if err != nil { + status.MarkNotReady(r.GetEventRecorder(), resource, v1alpha1.GetSnapshotFailedReason, err.Error()) + + return ctrl.Result{}, nil } // Get component descriptor set from artifact - cdSet, err := ocm.GetComponentSetForArtifact(r.Storage, artifactComponent) + cdSet, err := ocm.GetComponentSetForSnapshot(ctx, repositoryDescriptor, componentSnapshot) if err != nil { - status.MarkNotReady(r.EventRecorder, resource, v1alpha1.GetComponentForArtifactFailedReason, err.Error()) + status.MarkNotReady(r.EventRecorder, resource, v1alpha1.GetComponentForSnapshotFailedReason, err.Error()) return ctrl.Result{}, err } @@ -300,23 +278,76 @@ func (r *Reconciler) reconcileResource(ctx context.Context, octx ocmctx.Context, return ctrl.Result{}, err } - // revision is the digest of the resource. It is used to identify the resource in the storage (as filename) and to - // check if the resource is already present in the storage. - revision := resourceAccess.Meta().Digest.Value + // TODO: + // Problem: Do not re-download resources that are already present in the OCI registry + // Resolution: + // - Use resource-access-digest as OCI repository name + // - Check if OCI repository name exists + // - If yes, create manifest and point to the previous OCI layer blob + // - How? - // Get the artifact to check if it is already present while reconciling it - artifactStorage := r.Storage.NewArtifactFor(resource.GetKind(), resource.GetObjectMeta(), "", "") - if err := r.Client.Get(ctx, types.NamespacedName{Name: artifactStorage.Name, Namespace: artifactStorage.Namespace}, artifactStorage); err != nil { - if !apierrors.IsNotFound(err) { - status.MarkNotReady(r.EventRecorder, resource, v1alpha1.GetArtifactFailedReason, err.Error()) + // Get resource content + // No need to close the blob access as it will be closed automatically + blobAccess, err := getBlobAccess(ctx, resourceAccess) + if err != nil { + status.MarkNotReady(r.EventRecorder, resource, v1alpha1.GetBlobAccessFailedReason, err.Error()) - return ctrl.Result{}, fmt.Errorf("failed to get artifactStorage: %w", err) - } + return ctrl.Result{}, err } - err = reconcileArtifact(ctx, octx, r.Storage, resource, resourceAccess, revision, artifactStorage, func() error { return verifyResource(ctx, resourceAccess, cv, cd) }) + if err := verifyResource(ctx, resourceAccess, cv, cd); err != nil { + status.MarkNotReady(r.EventRecorder, resource, v1alpha1.VerifyResourceFailedReason, err.Error()) + + return ctrl.Result{}, err + } + + resourceContent, err := blobAccess.Get() + if err != nil { + status.MarkNotReady(r.EventRecorder, resource, v1alpha1.GetResourceFailedReason, err.Error()) + + return ctrl.Result{}, err + } + + // Create OCI repository + repositoryResourceName := resourceAccess.Meta().Digest.Value + repositoryResource, err := r.Registry.NewRepository(ctx, repositoryResourceName) + if err != nil { + status.MarkNotReady(r.GetEventRecorder(), resource, v1alpha1.GetComponentVersionFailedReason, err.Error()) + + return ctrl.Result{}, err + } + + // Push resource to OCI repository + manifestDigest, err := repositoryResource.PushSnapshot(ctx, resourceAccess.Meta().GetVersion(), resourceContent) if err != nil { - status.MarkNotReady(r.EventRecorder, resource, v1alpha1.ReconcileArtifactFailedReason, err.Error()) + status.MarkNotReady(r.GetEventRecorder(), resource, v1alpha1.PushSnapshotFailedReason, err.Error()) + + return ctrl.Result{}, err + } + + // Create respective snapshot CR + snapshotCR := snapshotRegistry.Create( + resource, + repositoryResourceName, + manifestDigest.String(), + resourceAccess.Meta().GetVersion(), + digest.FromBytes(resourceContent).String(), + int64(len(resourceContent))) + + if _, err = controllerutil.CreateOrUpdate(ctx, r.GetClient(), &snapshotCR, func() error { + if snapshotCR.ObjectMeta.CreationTimestamp.IsZero() { + if err := controllerutil.SetControllerReference(resource, &snapshotCR, r.GetScheme()); err != nil { + return fmt.Errorf("failed to set controller reference: %w", err) + } + } + + resource.Status.SnapshotRef = corev1.LocalObjectReference{ + Name: snapshotCR.GetName(), + } + + return nil + }); err != nil { + status.MarkNotReady(r.EventRecorder, component, v1alpha1.CreateSnapshotFailedReason, err.Error()) return ctrl.Result{}, err } @@ -437,123 +468,6 @@ func verifyResource(ctx context.Context, access ocmctx.ResourceAccess, cv ocmctx return nil } -// downloadResource downloads the resource from the resource access. -func downloadResource(ctx context.Context, octx ocmctx.Context, targetDir string, resource *v1alpha1.Resource, acc ocmctx.ResourceAccess, bAcc blobaccess.BlobAccess, -) (string, error) { - log.FromContext(ctx).V(1).Info("download resource") - - // Using a redirected resource acc to prevent redundant download - accessMock, err := ocm.NewRedirectedResourceAccess(acc, bAcc) - if err != nil { - return "", fmt.Errorf("failed to create redirected resource acc: %w", err) - } - - path, err := download.DownloadResource(octx, accessMock, filepath.Join(targetDir, resource.Name)) - if err != nil { - return "", fmt.Errorf("failed to download resource: %w", err) - } - - return path, nil -} - -// reconcileArtifact will download, verify, and reconcile the artifact in the storage if it is not already present in the storage. -// TODO: https://github.com/open-component-model/ocm-project/issues/297 -func reconcileArtifact( - ctx context.Context, - octx ocmctx.Context, - storage *storage.Storage, - resource *v1alpha1.Resource, - acc ocmctx.ResourceAccess, - revision string, - artifact *artifactv1.Artifact, - verifyFunc func() error, -) (retErr error) { - log.FromContext(ctx).V(1).Info("reconcile artifact") - - // Check if the artifact is already present and located in the storage - localPath := storage.LocalPath(artifact) - - // use the filename which is the revision as the artifact name - artifactPresent := storage.ArtifactExist(artifact) && strings.Split(filepath.Base(localPath), ".")[0] == revision - - // Init variables with default values in case the artifact is present - // If the artifact is present, the dirPath will be the directory of the local path to the directory - dirPath := filepath.Dir(localPath) - // If the artifact is already present, we do not want to archive it again - archiveFunc := func(_ *artifactv1.Artifact, _ string) error { - return nil - } - - // If the artifact is not present, we will verify and download the resource and provide it as artifact - //nolint:nestif // this is our main logic and we rather keep it in here - if !artifactPresent { - // No need to close the blob access as it will be closed automatically - bAcc, err := getBlobAccess(ctx, acc) - if err != nil { - return err - } - - // Check if resource can be verified - if err := verifyFunc(); err != nil { - return err - } - - // Target directory in which the resource is downloaded - tmp, err := os.MkdirTemp("", "resource-*") - if err != nil { - return fmt.Errorf("failed to create temporary directory: %w", err) - } - defer func() { - retErr = errors.Join(retErr, os.RemoveAll(tmp)) - }() - - path, err := downloadResource(ctx, octx, tmp, resource, acc, bAcc) - if err != nil { - return err - } - - // Since the artifact is not already present, an archive function is added to archive the downloaded resource in the storage - archiveFunc = func(art *artifactv1.Artifact, _ string) error { - logger := log.FromContext(ctx).WithValues("artifact", art.Name, "revision", revision, "path", path) - fi, err := os.Stat(path) - if err != nil { - return fmt.Errorf("failed to get file info: %w", err) - } - if fi.IsDir() { - logger.V(1).Info("archiving directory") - // Archive directory to storage - if err := storage.Archive(art, path, nil); err != nil { - return fmt.Errorf("failed to archive: %w", err) - } - } else { - if err := compression.AutoCompressAsGzipAndArchiveFile(ctx, art, storage, path); err != nil { - return fmt.Errorf("failed to auto compress and archive file: %w", err) - } - } - - resource.Status.ArtifactRef = corev1.LocalObjectReference{ - Name: art.Name, - } - - return nil - } - - // Overwrite the default dirPath with the temporary directory path that points to the downloaded resource - dirPath = tmp - } - - if err := storage.ReconcileStorage(ctx, resource); err != nil { - return fmt.Errorf("failed to reconcile resource storage: %w", err) - } - - // Provide artifact in storage - if err := storage.ReconcileArtifact(ctx, resource, revision, dirPath, revision, archiveFunc); err != nil { - return fmt.Errorf("failed to reconcile resource artifact: %w", err) - } - - return nil -} - // setResourceStatus updates the resource status with the all required information. func setResourceStatus(ctx context.Context, configs []v1alpha1.OCMConfiguration, resource *v1alpha1.Resource, resourceAccess ocmctx.ResourceAccess) error { log.FromContext(ctx).V(1).Info("updating resource status") diff --git a/internal/controller/resource/resource_controller_test.go b/internal/controller/resource/resource_controller_test.go index c93e7ba0..ff66484f 100644 --- a/internal/controller/resource/resource_controller_test.go +++ b/internal/controller/resource/resource_controller_test.go @@ -20,18 +20,18 @@ import ( "context" "fmt" "io" - "net/http" "os" + "path/filepath" "time" . "github.com/mandelsoft/goutils/testutils" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/opencontainers/go-digest" . "ocm.software/ocm/api/helper/builder" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "github.com/containers/image/v5/pkg/compression" "github.com/fluxcd/pkg/runtime/conditions" - "github.com/mandelsoft/filepath/pkg/filepath" "github.com/mandelsoft/vfs/pkg/osfs" "github.com/mandelsoft/vfs/pkg/vfs" "ocm.software/ocm/api/ocm/extensions/artifacttypes" @@ -43,7 +43,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/envtest/komega" "sigs.k8s.io/yaml" - artifactv1 "github.com/openfluxcd/artifact/api/v1alpha1" corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -52,6 +51,7 @@ import ( "github.com/open-component-model/ocm-k8s-toolkit/api/v1alpha1" "github.com/open-component-model/ocm-k8s-toolkit/pkg/ocm" + "github.com/open-component-model/ocm-k8s-toolkit/pkg/snapshot" ) const ( @@ -63,7 +63,7 @@ const ( ComponentVersion = "1.0.0" ResourceObj = "test-resource" ResourceVersion = "1.0.0" - ResourceContent = "resource content" + ResourceContent = "some important content" ) var _ = Describe("Resource Controller", func() { @@ -125,34 +125,19 @@ var _ = Describe("Resource Controller", func() { By("checking that the resource has been reconciled successfully") Eventually(komega.Object(resource), "5m").Should( HaveField("Status.ObservedGeneration", Equal(int64(1)))) - Expect(resource).To(HaveField("Status.ArtifactRef.Name", Not(BeEmpty()))) + Expect(resource).To(HaveField("Status.SnapshotRef.Name", Not(BeEmpty()))) Expect(resource).To(HaveField("Status.Resource.Name", Equal(ResourceObj))) Expect(resource).To(HaveField("Status.Resource.Type", Equal(artifacttypes.PLAIN_TEXT))) Expect(resource).To(HaveField("Status.Resource.Version", Equal(ResourceVersion))) - By("checking that the artifact has been created successfully") - artifact := &artifactv1.Artifact{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: resource.Namespace, - Name: resource.Status.ArtifactRef.Name, - }, - } - Eventually(komega.Get(artifact)).Should(Succeed()) - - By("checking that the artifact server provides the resource") - r := Must(http.Get(artifact.Spec.URL)) - Expect(r).Should(HaveHTTPStatus(http.StatusOK)) + By("checking that the snapshot has been created successfully") + snapshotResource := Must(snapshot.GetSnapshotForOwner(ctx, k8sClient, resource)) - By("checking that the resource content is correct") - reader, decompressed, err := compression.AutoDecompress(r.Body) - Expect(decompressed).To(BeTrue()) - DeferCleanup(func() { - Expect(reader.Close()).To(Succeed()) - }) - Expect(err).To(BeNil()) - resourceContent := Must(io.ReadAll(reader)) - - Expect(string(resourceContent)).To(Equal(ResourceContent)) + By("checking that the snapshot contains the correct content") + snapshotRepository := Must(registry.NewRepository(ctx, snapshotResource.Spec.Repository)) + snapshotResourceContentReader := Must(snapshotRepository.FetchSnapshot(ctx, snapshotResource.GetDigest())) + snapshotResourceContent := Must(io.ReadAll(snapshotResourceContentReader)) + Expect(string(snapshotResourceContent)).To(Equal(ResourceContent)) }) }) }) @@ -200,37 +185,28 @@ func prepareComponent(ctx context.Context, env *Builder, ctfPath string) { } Expect(k8sClient.Create(ctx, component)).To(Succeed()) - By("creating an component artifact") - revision := ComponentObj + "-" + ComponentVersion - var artifactName string - Expect(globStorage.ReconcileArtifact(ctx, component, revision, tmpDirCd, revision+".tar.gz", - func(art *artifactv1.Artifact, _ string) error { - // Archive directory to storage - if err := globStorage.Archive(art, tmpDirCd, nil); err != nil { - return fmt.Errorf("unable to archive artifact to storage: %w", err) - } - - artifactName = art.Name + By("creating an component snapshot") + repositoryName := Must(snapshot.CreateRepositoryName(component.Spec.RepositoryRef.Name, component.GetName())) + repository := Must(registry.NewRepository(ctx, repositoryName)) - return nil - }, - )).To(Succeed()) + manifestDigest := Must(repository.PushSnapshot(ctx, ComponentVersion, dataCds)) + snapshotCR := snapshot.Create(component, repositoryName, manifestDigest.String(), ComponentVersion, digest.FromBytes(dataCds).String(), int64(len(dataCds))) - By("checking that the artifact has been created successfully") - artifact := &artifactv1.Artifact{ - ObjectMeta: metav1.ObjectMeta{ - Name: artifactName, - Namespace: Namespace, - }, - } - Eventually(komega.Get(artifact)).Should(Succeed()) + _ = Must(controllerutil.CreateOrUpdate(ctx, k8sClient, &snapshotCR, func() error { + if snapshotCR.ObjectMeta.CreationTimestamp.IsZero() { + if err := controllerutil.SetControllerReference(component, &snapshotCR, k8sClient.Scheme()); err != nil { + return fmt.Errorf("failed to set controller reference: %w", err) + } + } + return nil + })) By("updating the component object with the respective status") baseComponent := component.DeepCopy() ready := *conditions.TrueCondition("Ready", "ready", "message") ready.LastTransitionTime = metav1.Time{Time: time.Now()} baseComponent.Status.Conditions = []metav1.Condition{ready} - baseComponent.Status.ArtifactRef = corev1.LocalObjectReference{Name: artifact.ObjectMeta.Name} + baseComponent.Status.SnapshotRef = corev1.LocalObjectReference{Name: snapshotCR.GetName()} spec := Must(ctf.NewRepositorySpec(ctf.ACC_READONLY, ctfPath)) specData := Must(spec.MarshalJSON()) baseComponent.Status.Component = v1alpha1.ComponentInfo{ diff --git a/internal/controller/resource/suite_test.go b/internal/controller/resource/suite_test.go index c5f9a2a4..0381e611 100644 --- a/internal/controller/resource/suite_test.go +++ b/internal/controller/resource/suite_test.go @@ -19,6 +19,7 @@ import ( "io" "net/http" "os" + "os/exec" "path/filepath" "runtime" "testing" @@ -28,8 +29,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/openfluxcd/controller-manager/server" - "github.com/openfluxcd/controller-manager/storage" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -47,6 +46,7 @@ import ( "github.com/open-component-model/ocm-k8s-toolkit/api/v1alpha1" "github.com/open-component-model/ocm-k8s-toolkit/pkg/ocm" + "github.com/open-component-model/ocm-k8s-toolkit/pkg/snapshot" ) // +kubebuilder:scaffold:imports @@ -54,16 +54,13 @@ import ( // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. -const ( - ARTIFACT_PATH = "ocm-k8s-artifactstore--*" - ARTIFACT_SERVER = "localhost:8081" -) - var cfg *rest.Config var k8sClient client.Client var k8sManager ctrl.Manager var testEnv *envtest.Environment -var globStorage *storage.Storage +var zotCmd *exec.Cmd +var registry *snapshot.Registry +var zotRootDir string func TestControllers(t *testing.T) { RegisterFailHandler(Fail) @@ -96,8 +93,6 @@ var _ = BeforeSuite(func() { }, ErrorIfCRDPathMissing: true, - CRDs: []*apiextensionsv1.CustomResourceDefinition{artifactCRD}, - // The BinaryAssetsDirectory is only required if you want to run the tests directly // without call the makefile target test. If not informed it will look for the // default path defined in controller-runtime which is /usr/local/kubebuilder/. @@ -132,10 +127,36 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) - tmpdir := Must(os.MkdirTemp("", ARTIFACT_PATH)) - address := ARTIFACT_SERVER - globStorage = Must(server.NewStorage(k8sClient, testEnv.Scheme, tmpdir, address, 0, 0)) - artifactServer := Must(server.NewArtifactServer(tmpdir, address, time.Millisecond)) + // Create zot-registry config file + zotRootDir = Must(os.MkdirTemp("", "")) + zotAddress := "0.0.0.0" + zotPort := "8081" + zotConfig := []byte(fmt.Sprintf(`{"storage":{"rootDirectory":"%s"},"http":{"address":"%s","port": "%s"}}`, zotRootDir, zotAddress, zotPort)) + zotConfigFile := filepath.Join(zotRootDir, "config.json") + MustBeSuccessful(os.WriteFile(zotConfigFile, zotConfig, 0644)) + + // Start zot-registry + zotCmd = exec.Command(filepath.Join("..", "..", "..", "bin", "zot-registry"), "serve", zotConfigFile) + err = zotCmd.Start() + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to start Zot")) + + // Wait for Zot to be ready + Eventually(func() error { + resp, err := http.Get(fmt.Sprintf("http://%s:%s/v2/", zotAddress, zotPort)) + if err != nil { + return fmt.Errorf("could not connect to Zot") + } + + defer resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + return nil + }, 30*time.Second, 1*time.Second).Should(Succeed(), "Zot registry did not start in time") + + registry, err = snapshot.NewRegistry(fmt.Sprintf("%s:%s", zotAddress, zotPort)) + registry.PlainHTTP = true Expect((&Reconciler{ BaseReconciler: &ocm.BaseReconciler{ @@ -146,18 +167,24 @@ var _ = BeforeSuite(func() { IncludeObject: true, }, }, - Storage: globStorage, + Registry: registry, }).SetupWithManager(k8sManager)).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) DeferCleanup(cancel) - go func() { - defer GinkgoRecover() - Expect(artifactServer.Start(ctx)).To(Succeed()) - }() go func() { defer GinkgoRecover() Expect(k8sManager.Start(ctx)).To(Succeed()) }() }) + +var _ = AfterSuite(func() { + if zotCmd != nil { + err := zotCmd.Process.Kill() + Expect(err).NotTo(HaveOccurred(), "Failed to stop Zot registry") + + // Clean up root directory + MustBeSuccessful(os.RemoveAll(zotRootDir)) + } +}) diff --git a/internal/controller/snapshot/controller_test.go b/internal/controller/snapshot/controller_test.go index d1bb0442..0d5f8c6f 100644 --- a/internal/controller/snapshot/controller_test.go +++ b/internal/controller/snapshot/controller_test.go @@ -7,7 +7,6 @@ import ( . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" - ocmmetav1 "ocm.software/ocm/api/ocm/compdesc/meta/v1" "sigs.k8s.io/controller-runtime/pkg/reconcile" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -16,6 +15,7 @@ import ( "github.com/open-component-model/ocm-k8s-toolkit/pkg/ocm" ) +// TODO: Create tests var _ = Describe("Snapshot Controller", func() { Context("When reconciling a resource", func() { const resourceName = "test-resource" @@ -33,17 +33,19 @@ var _ = Describe("Snapshot Controller", func() { err := k8sClient.Get(ctx, typeNamespacedName, snapshot) if err != nil && errors.IsNotFound(err) { resource := &deliveryv1alpha1.Snapshot{ - TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{ Name: resourceName, Namespace: "default", }, Spec: deliveryv1alpha1.SnapshotSpec{ - Identity: ocmmetav1.Identity{ - "name": "some-name", + Repository: "test-repository", + Digest: "sha256:test-digest", + Blob: deliveryv1alpha1.BlobInfo{ + Digest: "sha256:test-digest", + Tag: "1.0.0", + Size: 0, }, - Digest: "digest", - Tag: "tag", + Suspend: false, }, } Expect(k8sClient.Create(ctx, resource)).To(Succeed()) diff --git a/pkg/mocks/snapshot.go b/pkg/mocks/snapshot.go index ab83f047..080ec115 100644 --- a/pkg/mocks/snapshot.go +++ b/pkg/mocks/snapshot.go @@ -2,6 +2,7 @@ package mocks import ( "context" + "io" "github.com/opencontainers/go-digest" "sigs.k8s.io/controller-runtime/pkg/log" @@ -33,10 +34,10 @@ func (r *Repository) PushSnapshot(ctx context.Context, _ string, _ []byte) (dige return digest.FromString("mock"), nil } -func (r *Repository) FetchSnapshot(ctx context.Context, _ string) ([]byte, error) { +func (r *Repository) FetchSnapshot(ctx context.Context, _ string) (io.ReadCloser, error) { log.FromContext(ctx).Info("mocking snapshot fetch") - return []byte{}, nil + return nil, nil } func (r *Repository) DeleteSnapshot(ctx context.Context, _ string) error { diff --git a/pkg/ocm/artifact.go b/pkg/ocm/artifact.go index 48a85a64..127584dd 100644 --- a/pkg/ocm/artifact.go +++ b/pkg/ocm/artifact.go @@ -16,45 +16,19 @@ import ( ctrl "sigs.k8s.io/controller-runtime/pkg/client" "github.com/open-component-model/ocm-k8s-toolkit/api/v1alpha1" + "github.com/open-component-model/ocm-k8s-toolkit/pkg/snapshot" ) -// GetComponentSetForArtifact returns the component descriptor set for the given artifact. -func GetComponentSetForArtifact(storage *storage.Storage, artifact *artifactv1.Artifact) (_ *compdesc.ComponentVersionSet, retErr error) { - tmp, err := os.MkdirTemp("", "component-*") +// GetComponentSetForSnapshot returns the component descriptor set for the given artifact. +func GetComponentSetForSnapshot(ctx context.Context, repository snapshot.RepositoryType, snapshotResource *v1alpha1.Snapshot) (_ *compdesc.ComponentVersionSet, retErr error) { + reader, err := repository.FetchSnapshot(ctx, snapshotResource.GetDigest()) if err != nil { - return nil, fmt.Errorf("failed to create temporary directory: %w", err) + return nil, err } - defer func() { - retErr = errors.Join(retErr, os.RemoveAll(tmp)) - }() - - // Instead of using the http-functionality of the storage-server, we use the storage directly for performance reasons. - // This assumes that the controllers and the storage are running in the same pod. - unlock, err := storage.Lock(artifact) - if err != nil { - return nil, fmt.Errorf("failed to lock artifact: %w", err) - } - defer unlock() - - filePath := filepath.Join(tmp, v1alpha1.OCMComponentDescriptorList) - - if err := storage.CopyToPath(artifact, v1alpha1.OCMComponentDescriptorList, filePath); err != nil { - return nil, fmt.Errorf("failed to copy artifact to path: %w", err) - } - - // Read component descriptor list - file, err := os.Open(filePath) - if err != nil { - return nil, fmt.Errorf("failed to open component descriptor: %w", err) - } - defer func() { - retErr = errors.Join(retErr, file.Close()) - }() // Get component descriptor set cds := &Descriptors{} - - if err := yaml.NewYAMLToJSONDecoder(file).Decode(cds); err != nil { + if err := yaml.NewYAMLToJSONDecoder(reader).Decode(cds); err != nil { return nil, fmt.Errorf("failed to unmarshal component descriptors: %w", err) } @@ -136,3 +110,7 @@ func RemoveArtifactForCollectable( return nil } + +func GetComponentSetForArtifact(_ *storage.Storage, _ *artifactv1.Artifact) (*compdesc.ComponentVersionSet, error) { + return nil, nil +} diff --git a/pkg/snapshot/repository.go b/pkg/snapshot/repository.go index 5b074a90..e7f80ff9 100644 --- a/pkg/snapshot/repository.go +++ b/pkg/snapshot/repository.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "io" "github.com/mitchellh/hashstructure/v2" "github.com/opencontainers/go-digest" @@ -23,7 +24,7 @@ type RepositoryType interface { // PushSnapshot pushes the blob to its repository. It returns the manifest-digest to retrieve the blob. PushSnapshot(ctx context.Context, reference string, blob []byte) (digest.Digest, error) - FetchSnapshot(ctx context.Context, reference string) ([]byte, error) + FetchSnapshot(ctx context.Context, reference string) (io.ReadCloser, error) DeleteSnapshot(ctx context.Context, digest string) error } @@ -96,22 +97,46 @@ func (r *Repository) PushSnapshot(ctx context.Context, tag string, blob []byte) return "", fmt.Errorf("oci: error pushing manifest: %w", err) } - // Tag manifest + logger.Info("tagging OCI manifest") if err := r.Tag(ctx, manifestDescriptor, tag); err != nil { return "", fmt.Errorf("oci: error tagging manifest: %w", err) } + logger.Info("finished pushing snapshot") + return manifestDigest, nil } -func (r *Repository) FetchSnapshot(_ context.Context, _ string) ([]byte, error) { - return []byte{}, nil +func (r *Repository) FetchSnapshot(ctx context.Context, manifestDigest string) (io.ReadCloser, error) { + // Fetch manifest descriptor to get manifest. + manifestDescriptor, _, err := r.FetchReference(ctx, manifestDigest) + if err != nil { + return nil, fmt.Errorf("oci: error fetching manifest: %w", err) + } + + // Fetch manifest to get layer[0] descriptor. + manifestReader, err := r.Fetch(ctx, manifestDescriptor) + if err != nil { + return nil, fmt.Errorf("oci: error fetching manifest: %w", err) + } + + var manifest ociV1.Manifest + if err := json.NewDecoder(manifestReader).Decode(&manifest); err != nil { + return nil, fmt.Errorf("oci: error parsing manifest: %w", err) + } + + // We only expect single layer artifacts. + if len(manifest.Layers) != 1 { + return nil, fmt.Errorf("oci: expected 1 layer, got %d", len(manifest.Layers)) + } + + return r.Fetch(ctx, manifest.Layers[0]) } -func (r *Repository) DeleteSnapshot(ctx context.Context, digestString string) error { - manifestDescriptor, _, err := r.FetchReference(ctx, digestString) +func (r *Repository) DeleteSnapshot(ctx context.Context, manifestDigest string) error { + manifestDescriptor, _, err := r.FetchReference(ctx, manifestDigest) if err != nil { - return fmt.Errorf("error fetching manifest: %w", err) + return fmt.Errorf("oci: error fetching manifest: %w", err) } return r.Delete(ctx, manifestDescriptor) diff --git a/pkg/snapshot/snapshot.go b/pkg/snapshot/snapshot.go index c68aea6e..bbbeb8fa 100644 --- a/pkg/snapshot/snapshot.go +++ b/pkg/snapshot/snapshot.go @@ -1,11 +1,16 @@ package snapshot import ( + "context" + "errors" "fmt" "strings" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/validation" + "sigs.k8s.io/controller-runtime/pkg/client" + errorsK8s "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-component-model/ocm-k8s-toolkit/api/v1alpha1" @@ -40,3 +45,28 @@ func Create(owner v1alpha1.SnapshotWriter, ociRepository, manifestDigest, blobVe Status: v1alpha1.SnapshotStatus{}, } } + +func GetSnapshotForOwner(ctx context.Context, clientK8s client.Client, owner any) (*v1alpha1.Snapshot, error) { + ownerSnapshot, ok := owner.(v1alpha1.SnapshotWriter) + if !ok { + return nil, errors.New("owner is not a SnapshotWriter") + } + + // List all snapshots in owners namespace + var snapshots v1alpha1.SnapshotList + + if err := clientK8s.List(ctx, &snapshots, client.InNamespace(ownerSnapshot.GetNamespace())); err != nil { + return nil, fmt.Errorf("failed to list snapshots: %w", err) + } + + // Check for snapshot referenced by owner + for _, snapshot := range snapshots.Items { + for _, ref := range snapshot.ObjectMeta.OwnerReferences { + if ownerSnapshot.GetUID() == ref.UID { + return &snapshot, nil + } + } + } + + return nil, errorsK8s.NewNotFound(schema.GroupResource{Resource: "snapshots"}, "snapshot not found") +}