Skip to content

Commit

Permalink
Enable remote layer federation
Browse files Browse the repository at this point in the history
During a cross-repo mount push where the mounted blob belongs to remote
image, tag the image into target image stream. This will utilize
pullthrough to federate image's blobs without storing them in the
registry.

Image is tagged under `_pullthrough_dep_${blobdgst}` tag.

Signed-off-by: Michal Minář <[email protected]>
  • Loading branch information
Michal Minář committed Jul 29, 2016
1 parent 2a874eb commit 1670b8c
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 5 deletions.
92 changes: 87 additions & 5 deletions pkg/dockerregistry/server/errorblobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/reference"

kapi "k8s.io/kubernetes/pkg/api"
kerrors "k8s.io/kubernetes/pkg/api/errors"

imageapi "github.com/openshift/origin/pkg/image/api"
)

// errorBlobStore wraps a distribution.BlobStore for a particular repo.
Expand Down Expand Up @@ -55,14 +60,21 @@ func (r *errorBlobStore) Create(ctx context.Context, options ...distribution.Blo
}

ctx = WithRepository(ctx, r.repo)
var pullthroughSourceImageReference *imageapi.DockerImageReference

opts, err := effectiveCreateOptions(options)
if err != nil {
return nil, err
}
err = checkPendingCrossMountErrors(ctx, opts)
if err == nil && opts.Mount.ShouldMount {
desc, err = statSourceRepository(ctx, opts.Mount.From, opts.Mount.From.Digest())
context.GetLogger(ctx).Debugf("checking for presence of blob %s in a source repository %s", opts.Mount.From.Digest().String(), opts.Mount.From.Name())
desc, pullthroughSourceImageReference, err = statSourceRepository(ctx, r.repo, opts.Mount.From, opts.Mount.From.Digest())
}
if err == nil && pullthroughSourceImageReference != nil {
ref := pullthroughSourceImageReference.MostSpecific()
context.GetLogger(ctx).Debugf("trying to tag source image %s into image stream %s", ref.Exact(), r.repo.Named().Name())
err = tagPullthroughSourceImageInTargetRepository(ctx, &ref, r.repo, opts.Mount.From.Digest())
}

if err != nil {
Expand Down Expand Up @@ -154,10 +166,80 @@ func (f statCrossMountCreateOptions) Apply(v interface{}) error {
return nil
}

func statSourceRepository(ctx context.Context, sourceRepoName reference.Named, dgst digest.Digest) (distribution.Descriptor, error) {
repo, err := dockerRegistry.Repository(ctx, sourceRepoName)
// statSourceRepository founds a blob in the source repository of cross-repo mount and returns its descriptor
// if found. If the blob is not stored locally but it's available in remote repository, the
// pullthroughSourceImageReference output parameter will be set to contain a reference of an image containing
// it.
func statSourceRepository(
ctx context.Context,
destRepo *repository,
sourceRepoName reference.Named,
dgst digest.Digest,
) (desc distribution.Descriptor, pullthroughSourceImageReference *imageapi.DockerImageReference, err error) {
upstreamRepo, err := dockerRegistry.Repository(ctx, sourceRepoName)
if err != nil {
return distribution.Descriptor{}, err
return distribution.Descriptor{}, nil, err
}
namespace, name, err := getNamespaceName(sourceRepoName.Name())
if err != nil {
return distribution.Descriptor{}, nil, err
}

repo := *destRepo
repo.namespace = namespace
repo.name = name
repo.Repository = upstreamRepo

// ask pullthrough blob store to set source image reference if the blob is found in remote repository
var ref imageapi.DockerImageReference
ctx = WithPullthroughSourceImageReference(ctx, &ref)

desc, err = repo.Blobs(ctx).Stat(ctx, dgst)
if err == nil && len(ref.Registry) != 0 {
pullthroughSourceImageReference = &ref
}
return repo.Blobs(ctx).Stat(ctx, dgst)
return
}

// tagPullthroughSourceImageInTargetRepository creates a tag in a destination image stream of cross-repo mount
// referencing a remote image that contains the blob. With the reference present in the target stream, the
// pullthrough will allow to serve the blob from the image stream without storing it locally.
func tagPullthroughSourceImageInTargetRepository(ctx context.Context, ref *imageapi.DockerImageReference, destRepo *repository, dgst digest.Digest) error {
if len(ref.ID) == 0 {
return fmt.Errorf("cannot tag image lacking ID as a pullthrough source (%s)", ref.Exact())
}

tag := fmt.Sprintf("_pullthrough_dep_%s", dgst.Hex()[0:6])

is, err := destRepo.getImageStream()
if err != nil {
if !kerrors.IsNotFound(err) {
return err
}

// create image stream
stream := imageapi.ImageStream{
ObjectMeta: kapi.ObjectMeta{
Name: destRepo.name,
},
}
context.GetLogger(ctx).Infof("creating image stream to hold pullthrough source image %q for blob %q", ref.Exact(), dgst.String())
is, err = destRepo.registryOSClient.ImageStreams(destRepo.namespace).Create(&stream)
if kerrors.IsAlreadyExists(err) {
is, err = destRepo.getImageStream()
if err != nil {
return err
}
}
}

_, err = imageapi.ResolveImageID(is, ref.ID)
if err == nil {
context.GetLogger(ctx).Debugf("source image %s is already rererenced in image stream", ref.ID)
return err
}

// TODO: there's a danger of creating several similar tags for different blobs during a single image push
context.GetLogger(ctx).Infof("creating istag %s:%s referencing image %q", destRepo.Named().Name(), tag, ref.ID)
return destRepo.Tags(ctx).Tag(ctx, tag, distribution.Descriptor{Digest: digest.Digest(ref.ID)})
}
9 changes: 9 additions & 0 deletions pkg/dockerregistry/server/pullthroughblobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func (r *pullthroughBlobStore) findCandidateRepository(ctx context.Context, sear
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}

// if not nil, higher level asks us to set image reference if found
pullthroughSourceImage, _ := PullthroughSourceImageReferenceFrom(ctx)

// see if any of the previously located repositories containing this digest are in this
// image stream
for _, repo := range cachedLayers {
Expand All @@ -151,6 +154,9 @@ func (r *pullthroughBlobStore) findCandidateRepository(ctx context.Context, sear
continue
}
context.GetLogger(r.repo.ctx).Infof("Found digest location from cache %q in %q: %v", dgst, repo, err)
if pullthroughSourceImage != nil {
*pullthroughSourceImage = *ref
}
return desc, nil
}

Expand All @@ -162,6 +168,9 @@ func (r *pullthroughBlobStore) findCandidateRepository(ctx context.Context, sear
}
r.repo.cachedLayers.RememberDigest(dgst, r.repo.blobrepositorycachettl, repo)
context.GetLogger(r.repo.ctx).Infof("Found digest location by search %q in %q: %v", dgst, repo, err)
if pullthroughSourceImage != nil {
*pullthroughSourceImage = *ref
}
return desc, nil
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/dockerregistry/server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
const (
// repositoryKey serves to store/retrieve repository object to/from context.
repositoryKey = "openshift.repository"

pullthroughSourceImageReferenceKey = "openshift.pullthrough.sourceImage"
)

func WithRepository(parent context.Context, repo *repository) context.Context {
Expand All @@ -27,6 +29,13 @@ func RepositoryFrom(ctx context.Context) (repo *repository, found bool) {
repo, found = ctx.Value(repositoryKey).(*repository)
return
}
func WithPullthroughSourceImageReference(parent context.Context, ref *imageapi.DockerImageReference) context.Context {
return context.WithValue(parent, pullthroughSourceImageReferenceKey, ref)
}
func PullthroughSourceImageReferenceFrom(ctx context.Context) (ref *imageapi.DockerImageReference, found bool) {
ref, found = ctx.Value(pullthroughSourceImageReferenceKey).(*imageapi.DockerImageReference)
return
}

func getOptionValue(
envVar string,
Expand Down
2 changes: 2 additions & 0 deletions test/end-to-end/core.sh
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '$
os::cmd::expect_success_and_text "curl -I -X POST -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/uploads/?mount=$rubyimageblob&from=cache/hello-world'" "202 Accepted"
# 201 means that blob has been cross mounted from given repository
os::cmd::expect_success_and_text "curl -I -X POST -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/uploads/?mount=$rubyimageblob&from=cache/ruby-22-centos7'" "201 Created"
os::cmd::expect_success_and_text "oc get -n crossmount istag repo:_pullthrough_dep_${rubyimageblob:7:6}" "$rubyimagedigest"
os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/$rubyimageblob'" "200 OK"
# check that the blob is linked now
os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/$rubyimageblob'" "200 OK"
# remove pusher's permissions to read from the source repository
Expand Down

0 comments on commit 1670b8c

Please sign in to comment.