Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Public GC function of oci.Store #656

Merged
merged 25 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions content/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"os"
"path"
"path/filepath"
"sync"

Expand Down Expand Up @@ -454,6 +455,77 @@ func (s *Store) writeIndexFile() error {
return os.WriteFile(s.indexPath, indexJSON, 0666)
}

// reloadIndex reloads the index and updates metadata by creating a new store.
func (s *Store) reloadIndex(ctx context.Context) error {
newStore, err := NewWithContext(ctx, s.root)
if err != nil {
return err
}
s.index = newStore.index
s.storage = newStore.storage
s.tagResolver = newStore.tagResolver
s.graph = newStore.graph
return nil
}

// GC removes garbage from Store. Unsaved index will be lost. To prevent unexpected
// loss, call SaveIndex() before GC or set AutoSaveIndex to true.
// The garbage to be cleaned are:
// - unreferenced (dangling) blobs in Store which have no predecessors
// - garbage blobs in the storage whose metadata is not stored in Store
func (s *Store) GC(ctx context.Context) error {
s.sync.Lock()
defer s.sync.Unlock()

// get reachable nodes by reloading the index
err := s.reloadIndex(ctx)
if err != nil {
return fmt.Errorf("unable to reload index: %w", err)
}
reachableNodes := s.graph.DigestSet()

// clean up garbage blobs in the storage
rootpath := filepath.Join(s.root, ocispec.ImageBlobsDir)
algDirs, err := os.ReadDir(rootpath)
if err != nil {
return err
}
for _, algDir := range algDirs {
if !algDir.IsDir() {
continue
}
alg := algDir.Name()
// skip unsupported directories
if !isKnownAlgorithm(alg) {
continue
}
algPath := path.Join(rootpath, alg)
digestEntries, err := os.ReadDir(algPath)
if err != nil {
return err
}
for _, digestEntry := range digestEntries {
if err := isContextDone(ctx); err != nil {
return err
}
dgst := digestEntry.Name()
blobDigest := digest.NewDigestFromEncoded(digest.Algorithm(alg), dgst)
if err := blobDigest.Validate(); err != nil {
// skip irrelevant content
continue
}
if !reachableNodes.Contains(blobDigest) {
// remove the blob from storage if it does not exist in Store
err = os.Remove(path.Join(algPath, dgst))
if err != nil {
return err
}
}
}
}
return nil
}

// unsafeStore is used to bypass lock restrictions in Delete.
type unsafeStore struct {
*Store
Expand All @@ -467,6 +539,17 @@ func (s *unsafeStore) Predecessors(ctx context.Context, node ocispec.Descriptor)
return s.graph.Predecessors(ctx, node)
}

// isContextDone returns an error if the context is done.
// Reference: https://pkg.go.dev/context#Context
func isContextDone(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}

// validateReference validates ref.
func validateReference(ref string) error {
if ref == "" {
Expand All @@ -476,3 +559,13 @@ func validateReference(ref string) error {
// TODO: may enforce more strict validation if needed.
return nil
}

// isKnownAlgorithm checks is a string is a supported hash algorithm
func isKnownAlgorithm(alg string) bool {
switch digest.Algorithm(alg) {
case digest.SHA256, digest.SHA512, digest.SHA384:
return true
default:
return false
}
}
206 changes: 206 additions & 0 deletions content/oci/oci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"os"
"path"
"path/filepath"
"reflect"
"strconv"
Expand Down Expand Up @@ -2844,6 +2845,199 @@ func TestStore_UntagErrorPath(t *testing.T) {
}
}

func TestStore_GC(t *testing.T) {
tempDir := t.TempDir()
s, err := New(tempDir)
if err != nil {
t.Fatal("New() error =", err)
}
ctx := context.Background()

// generate test content
var blobs [][]byte
var descs []ocispec.Descriptor
appendBlob := func(mediaType string, blob []byte) {
blobs = append(blobs, blob)
descs = append(descs, ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
})
}
generateManifest := func(config ocispec.Descriptor, subject *ocispec.Descriptor, layers ...ocispec.Descriptor) {
manifest := ocispec.Manifest{
Config: config,
Subject: subject,
Layers: layers,
}
manifestJSON, err := json.Marshal(manifest)
if err != nil {
t.Fatal(err)
}
appendBlob(ocispec.MediaTypeImageManifest, manifestJSON)
}
generateImageIndex := func(manifests ...ocispec.Descriptor) {
index := ocispec.Index{
Manifests: manifests,
}
indexJSON, err := json.Marshal(index)
if err != nil {
t.Fatal(err)
}
appendBlob(ocispec.MediaTypeImageIndex, indexJSON)
}
generateArtifactManifest := func(blobs ...ocispec.Descriptor) {
var manifest spec.Artifact
manifest.Blobs = append(manifest.Blobs, blobs...)
manifestJSON, err := json.Marshal(manifest)
if err != nil {
t.Fatal(err)
}
appendBlob(spec.MediaTypeArtifactManifest, manifestJSON)
}

appendBlob(ocispec.MediaTypeImageConfig, []byte("config")) // Blob 0
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob")) // Blob 1
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer")) // Blob 2, dangling layer
generateManifest(descs[0], nil, descs[1]) // Blob 3, valid manifest
generateManifest(descs[0], &descs[3], descs[1]) // Blob 4, referrer of a valid manifest, not in index.json, should be cleaned with current implementation
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer 2")) // Blob 5, dangling layer
generateArtifactManifest(descs[4]) // blob 6, dangling artifact
generateManifest(descs[0], &descs[5], descs[1]) // Blob 7, referrer of a dangling manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer 3")) // Blob 8, dangling layer
generateArtifactManifest(descs[6]) // blob 9, dangling artifact
generateImageIndex(descs[7], descs[5]) // blob 10, dangling image index
appendBlob(ocispec.MediaTypeImageLayer, []byte("garbage layer 1")) // Blob 11, garbage layer 1
generateManifest(descs[0], nil, descs[4]) // Blob 12, garbage manifest 1
appendBlob(ocispec.MediaTypeImageConfig, []byte("garbage config")) // Blob 13, garbage config
appendBlob(ocispec.MediaTypeImageLayer, []byte("garbage layer 2")) // Blob 14, garbage layer 2
generateManifest(descs[6], nil, descs[7]) // Blob 15, garbage manifest 2
generateManifest(descs[0], &descs[13], descs[1]) // Blob 16, referrer of a garbage manifest

// push blobs 0 - blobs 10 into s
for i := 0; i <= 10; i++ {
err := s.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
t.Errorf("failed to push test content to src: %d: %v", i, err)
}
}

// remove blobs 4 - blobs 10 from index.json
for i := 4; i <= 10; i++ {
s.tagResolver.Untag(string(descs[i].Digest))
}
s.SaveIndex()

// push blobs 11 - blobs 16 into s.storage, making them garbage as their metadata
// doesn't exist in s
for i := 11; i < len(blobs); i++ {
err := s.storage.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
t.Errorf("failed to push test content to src: %d: %v", i, err)
}
}

// confirm that all the blobs are in the storage
for i := 11; i < len(blobs); i++ {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("descs[%d] should exist", i)
}
}

// perform GC
if err = s.GC(ctx); err != nil {
t.Fatal(err)
}

// verify existence
wantExistence := []bool{true, true, false, true, false, false, false, false, false, false, false, false, false, false, false, false, false}
for i, wantValue := range wantExistence {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
t.Fatal(err)
}
if exists != wantValue {
t.Fatalf("want existence %d to be %v, got %v", i, wantValue, exists)
}
}
}

func TestStore_GCErrorPath(t *testing.T) {
tempDir := t.TempDir()
s, err := New(tempDir)
if err != nil {
t.Fatal("New() error =", err)
}
ctx := context.Background()

// generate test content
var blobs [][]byte
var descs []ocispec.Descriptor
appendBlob := func(mediaType string, blob []byte) {
blobs = append(blobs, blob)
descs = append(descs, ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
})
}
appendBlob(ocispec.MediaTypeImageLayer, []byte("valid blob")) // Blob 0

// push the valid blob
err = s.Push(ctx, descs[0], bytes.NewReader(blobs[0]))
if err != nil {
t.Error("failed to push test content to src")
}

// write random contents
algPath := path.Join(tempDir, "blobs")
dgstPath := path.Join(algPath, "sha256")
if err := os.WriteFile(path.Join(algPath, "other"), []byte("random"), 0444); err != nil {
t.Fatal("error calling WriteFile(), error =", err)
}
if err := os.WriteFile(path.Join(dgstPath, "other2"), []byte("random2"), 0444); err != nil {
t.Fatal("error calling WriteFile(), error =", err)
}

// perform GC
if err = s.GC(ctx); err != nil {
t.Fatal(err)
}

appendBlob(ocispec.MediaTypeImageLayer, []byte("valid blob 2")) // Blob 1

// push the valid blob
err = s.Push(ctx, descs[1], bytes.NewReader(blobs[1]))
if err != nil {
t.Error("failed to push test content to src")
}

// unknown algorithm
if err := os.Mkdir(path.Join(algPath, "sha666"), 0777); err != nil {
t.Fatal(err)
}
if err = s.GC(ctx); err != nil {
t.Fatal("this error should be silently ignored")
}

// os.Remove() error
badDigest := digest.FromBytes([]byte("bad digest")).Encoded()
badPath := path.Join(algPath, "sha256", badDigest)
if err := os.Mkdir(badPath, 0777); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(path.Join(badPath, "whatever"), []byte("extra content"), 0444); err != nil {
t.Fatal("error calling WriteFile(), error =", err)
}
if err = s.GC(ctx); err == nil {
t.Fatal("expect an error when os.Remove()")
}
}

func equalDescriptorSet(actual []ocispec.Descriptor, expected []ocispec.Descriptor) bool {
if len(actual) != len(expected) {
return false
Expand All @@ -2863,3 +3057,15 @@ func equalDescriptorSet(actual []ocispec.Descriptor, expected []ocispec.Descript
}
return true
}

func Test_isContextDone(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
if err := isContextDone(ctx); err != nil {
t.Errorf("expect error = %v, got %v", nil, err)
}
cancel()
if err := isContextDone(ctx); err != context.Canceled {
t.Errorf("expect error = %v, got %v", context.Canceled, err)
}
}
10 changes: 10 additions & 0 deletions internal/graph/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"sync"

"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
Expand Down Expand Up @@ -147,6 +148,15 @@ func (m *Memory) Remove(node ocispec.Descriptor) []ocispec.Descriptor {
return danglings
}

// DigestSet returns the set of node digest in memory.
func (m *Memory) DigestSet() set.Set[digest.Digest] {
s := set.New[digest.Digest]()
for desc := range m.nodes {
s.Add(desc.Digest)
}
return s
}

// index indexes predecessors for each direct successor of the given node.
func (m *Memory) index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
successors, err := content.Successors(ctx, fetcher, node)
Expand Down
Loading