Skip to content

Commit

Permalink
Merge pull request #9819 from miminar/remote-layer-federation
Browse files Browse the repository at this point in the history
Merged by openshift-bot
  • Loading branch information
OpenShift Bot authored Aug 7, 2016
2 parents 681c6f2 + db956d8 commit 7998ae4
Show file tree
Hide file tree
Showing 21 changed files with 3,103 additions and 223 deletions.
1 change: 1 addition & 0 deletions images/dockerregistry/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ middleware:
pullthrough: true
enforcequota: false
projectcachettl: 1m
blobrepositorycachettl: 10m
storage:
- name: openshift
38 changes: 16 additions & 22 deletions pkg/dockerregistry/server/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,36 @@ const (
TokenRealmKey = "token-realm"
)

// RegistryClient encapsulates getting access to the OpenShift API.
type RegistryClient interface {
// Clients return the authenticated client to use with the server.
Clients() (client.Interface, kclient.Interface, error)
// SafeClientConfig returns a client config without authentication info.
SafeClientConfig() restclient.Config
}

// DefaultRegistryClient is exposed for testing the registry with fake client.
var DefaultRegistryClient = NewRegistryClient(clientcmd.NewConfig().BindToFile())

// RegistryClient encapsulates getting access to the OpenShift API.
type RegistryClient struct {
// registryClient implements RegistryClient
type registryClient struct {
config *clientcmd.Config
}

var _ RegistryClient = &registryClient{}

// NewRegistryClient creates a registry client.
func NewRegistryClient(config *clientcmd.Config) *RegistryClient {
return &RegistryClient{config: config}
func NewRegistryClient(config *clientcmd.Config) RegistryClient {
return &registryClient{config: config}
}

// Client returns the authenticated client to use with the server.
func (r *RegistryClient) Clients() (client.Interface, kclient.Interface, error) {
func (r *registryClient) Clients() (client.Interface, kclient.Interface, error) {
return r.config.Clients()
}

// SafeClientConfig returns a client config without authentication info.
func (r *RegistryClient) SafeClientConfig() restclient.Config {
func (r *registryClient) SafeClientConfig() restclient.Config {
return clientcmd.AnonymousClientConfig(r.config.OpenShiftConfig())
}

Expand Down Expand Up @@ -327,22 +337,6 @@ func (ac *AccessController) Authorized(ctx context.Context, accessRecords ...reg
return WithUserClient(ctx, osClient), nil
}

func getNamespaceName(resourceName string) (string, string, error) {
repoParts := strings.SplitN(resourceName, "/", 2)
if len(repoParts) != 2 {
return "", "", ErrNamespaceRequired
}
ns := repoParts[0]
if len(ns) == 0 {
return "", "", ErrNamespaceRequired
}
name := repoParts[1]
if len(name) == 0 {
return "", "", ErrNamespaceRequired
}
return ns, name, nil
}

func getOpenShiftAPIToken(ctx context.Context, req *http.Request) (string, error) {
token := ""

Expand Down
179 changes: 178 additions & 1 deletion pkg/dockerregistry/server/blobdescriptorservice.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
package server

import (
"fmt"
"sort"
"time"

"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/middleware/registry"
"github.com/docker/distribution/registry/storage"

kerrors "k8s.io/kubernetes/pkg/api/errors"

imageapi "github.com/openshift/origin/pkg/image/api"
)

// ByGeneration allows for sorting tag events from latest to oldest.
type ByGeneration []*imageapi.TagEvent

func (b ByGeneration) Less(i, j int) bool { return b[i].Generation > b[j].Generation }
func (b ByGeneration) Len() int { return len(b) }
func (b ByGeneration) Swap(i, j int) { b[i], b[j] = b[j], b[i] }

func init() {
middleware.RegisterOptions(storage.BlobDescriptorServiceFactory(&blobDescriptorServiceFactory{}))
}
Expand All @@ -25,6 +40,168 @@ type blobDescriptorService struct {
distribution.BlobDescriptorService
}

// Stat returns a a blob descriptor if the given blob is either linked in repository or is referenced in
// corresponding image stream. This method is invoked from inside of upstream's linkedBlobStore. It expects
// a proper repository object to be set on given context by upper openshift middleware wrappers.
func (bs *blobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
return dockerRegistry.BlobStatter().Stat(ctx, dgst)
repo, found := RepositoryFrom(ctx)
if !found || repo == nil {
err := fmt.Errorf("failed to retrieve repository from context")
context.GetLogger(ctx).Error(err)
return distribution.Descriptor{}, err
}

// if there is a repo layer link, return its descriptor
desc, err := bs.BlobDescriptorService.Stat(ctx, dgst)
if err == nil {
// and remember the association
repo.cachedLayers.RememberDigest(dgst, repo.blobrepositorycachettl, imageapi.DockerImageReference{
Namespace: repo.namespace,
Name: repo.name,
}.Exact())
return desc, nil
}

context.GetLogger(ctx).Debugf("could not stat layer link %q in repository %q: %v", dgst.String(), repo.Named().Name(), err)

// verify the blob is stored locally
desc, err = dockerRegistry.BlobStatter().Stat(ctx, dgst)
if err != nil {
return desc, err
}

// ensure it's referenced inside of corresponding image stream
if imageStreamHasBlob(repo, dgst) {
return desc, nil
}

return distribution.Descriptor{}, distribution.ErrBlobUnknown
}

func (bs *blobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
repo, found := RepositoryFrom(ctx)
if !found || repo == nil {
err := fmt.Errorf("failed to retrieve repository from context")
context.GetLogger(ctx).Error(err)
return err
}

repo.cachedLayers.ForgetDigest(dgst, imageapi.DockerImageReference{
Namespace: repo.namespace,
Name: repo.name,
}.Exact())
return bs.BlobDescriptorService.Clear(ctx, dgst)
}

// imageStreamHasBlob returns true if the given blob digest is referenced in image stream corresponding to
// given repository. If not found locally, image stream's images will be iterated and fetched from newest to
// oldest until found. Each processed image will update local cache of blobs.
func imageStreamHasBlob(r *repository, dgst digest.Digest) bool {
repoCacheName := imageapi.DockerImageReference{Namespace: r.namespace, Name: r.name}.Exact()
if r.cachedLayers.RepositoryHasBlob(repoCacheName, dgst) {
context.GetLogger(r.ctx).Debugf("found cached blob %q in repository %s", dgst.String(), r.Named().Name())
return true
}

context.GetLogger(r.ctx).Debugf("verifying presence of blob %q in image stream %s/%s", dgst.String(), r.namespace, r.name)
started := time.Now()
logFound := func(found bool) bool {
elapsed := time.Now().Sub(started)
if found {
context.GetLogger(r.ctx).Debugf("verified presence of blob %q in image stream %s/%s after %s", dgst.String(), r.namespace, r.name, elapsed.String())
} else {
context.GetLogger(r.ctx).Debugf("detected absence of blob %q in image stream %s/%s after %s", dgst.String(), r.namespace, r.name, elapsed.String())
}
return found
}

// verify directly with etcd
is, err := r.getImageStream()
if err != nil {
context.GetLogger(r.ctx).Errorf("failed to get image stream: %v", err)
return logFound(false)
}

tagEvents := []*imageapi.TagEvent{}
event2Name := make(map[*imageapi.TagEvent]string)
for name, eventList := range is.Status.Tags {
for i := range eventList.Items {
event := &eventList.Items[i]
tagEvents = append(tagEvents, event)
event2Name[event] = name
}
}
// search from youngest to oldest
sort.Sort(ByGeneration(tagEvents))

processedImages := map[string]struct{}{}

for _, tagEvent := range tagEvents {
if _, processed := processedImages[tagEvent.Image]; processed {
continue
}
if imageHasBlob(r, repoCacheName, tagEvent.Image, dgst.String(), !r.pullthrough) {
tagName := event2Name[tagEvent]
context.GetLogger(r.ctx).Debugf("blob found under istag %s/%s:%s in image %s", r.namespace, r.name, tagName, tagEvent.Image)
return logFound(true)
}
processedImages[tagEvent.Image] = struct{}{}
}

context.GetLogger(r.ctx).Warnf("blob %q exists locally but is not referenced in repository %s/%s", dgst.String(), r.namespace, r.name)

return logFound(false)
}

// imageHasBlob returns true if the image identified by imageName refers to the given blob. The image is
// fetched. If requireManaged is true and the image is not managed (it refers to remote registry), the image
// will not be processed. Fetched image will update local cache of blobs -> repositories with (blobDigest,
// cacheName) pairs.
func imageHasBlob(
r *repository,
cacheName,
imageName,
blobDigest string,
requireManaged bool,
) bool {
context.GetLogger(r.ctx).Debugf("getting image %s", imageName)
image, err := r.getImage(digest.Digest(imageName))
if err != nil {
if kerrors.IsNotFound(err) {
context.GetLogger(r.ctx).Debugf("image %q not found: imageName")
} else {
context.GetLogger(r.ctx).Errorf("failed to get image: %v", err)
}
return false
}

// in case of pullthrough disabled, client won't be able to download a blob belonging to not managed image
// (image stored in external registry), thus don't consider them as candidates
if managed := image.Annotations[imageapi.ManagedByOpenShiftAnnotation]; requireManaged && managed != "true" {
context.GetLogger(r.ctx).Debugf("skipping not managed image")
return false
}

if len(image.DockerImageLayers) == 0 {
if len(image.DockerImageManifestMediaType) > 0 {
// If the media type is set, we can safely assume that the best effort to fill the image layers
// has already been done. There are none.
return false
}
err = imageapi.ImageWithMetadata(image)
if err != nil {
context.GetLogger(r.ctx).Errorf("failed to get metadata for image %s: %v", imageName, err)
return false
}
}

for _, layer := range image.DockerImageLayers {
if layer.Name == blobDigest {
// remember all the layers of matching image
r.rememberLayersOfImage(image, cacheName)
return true
}
}

return false
}
Loading

0 comments on commit 7998ae4

Please sign in to comment.