Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Enable remote layer federation #10120

Closed
1 change: 1 addition & 0 deletions images/dockerregistry/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ middleware:
pullthrough: true
enforcequota: false
projectcachettl: 1m
blobrepositorycachettl: 10m
storage:
- name: openshift
38 changes: 16 additions & 22 deletions pkg/dockerregistry/server/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,36 @@ const (
TokenRealmKey = "token-realm"
)

// RegistryClient encapsulates getting access to the OpenShift API.
type RegistryClient interface {
// Clients return the authenticated client to use with the server.
Clients() (client.Interface, kclient.Interface, error)
// SafeClientConfig returns a client config without authentication info.
SafeClientConfig() restclient.Config
}

// DefaultRegistryClient is exposed for testing the registry with fake client.
var DefaultRegistryClient = NewRegistryClient(clientcmd.NewConfig().BindToFile())

// RegistryClient encapsulates getting access to the OpenShift API.
type RegistryClient struct {
// registryClient implements RegistryClient
type registryClient struct {
config *clientcmd.Config
}

var _ RegistryClient = &registryClient{}

// NewRegistryClient creates a registry client.
func NewRegistryClient(config *clientcmd.Config) *RegistryClient {
return &RegistryClient{config: config}
func NewRegistryClient(config *clientcmd.Config) RegistryClient {
return &registryClient{config: config}
}

// Client returns the authenticated client to use with the server.
func (r *RegistryClient) Clients() (client.Interface, kclient.Interface, error) {
func (r *registryClient) Clients() (client.Interface, kclient.Interface, error) {
return r.config.Clients()
}

// SafeClientConfig returns a client config without authentication info.
func (r *RegistryClient) SafeClientConfig() restclient.Config {
func (r *registryClient) SafeClientConfig() restclient.Config {
return clientcmd.AnonymousClientConfig(r.config.OpenShiftConfig())
}

Expand Down Expand Up @@ -327,22 +337,6 @@ func (ac *AccessController) Authorized(ctx context.Context, accessRecords ...reg
return WithUserClient(ctx, osClient), nil
}

func getNamespaceName(resourceName string) (string, string, error) {
repoParts := strings.SplitN(resourceName, "/", 2)
if len(repoParts) != 2 {
return "", "", ErrNamespaceRequired
}
ns := repoParts[0]
if len(ns) == 0 {
return "", "", ErrNamespaceRequired
}
name := repoParts[1]
if len(name) == 0 {
return "", "", ErrNamespaceRequired
}
return ns, name, nil
}

func getOpenShiftAPIToken(ctx context.Context, req *http.Request) (string, error) {
token := ""

Expand Down
166 changes: 165 additions & 1 deletion pkg/dockerregistry/server/blobdescriptorservice.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
package server

import (
"fmt"
"sort"

"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/middleware/registry"
"github.com/docker/distribution/registry/storage"

kerrors "k8s.io/kubernetes/pkg/api/errors"

imageapi "github.com/openshift/origin/pkg/image/api"
)

// ByGeneration allows for sorting tag events from latest to oldest.
type ByGeneration []*imageapi.TagEvent

func (b ByGeneration) Less(i, j int) bool { return b[i].Generation > b[j].Generation }
func (b ByGeneration) Len() int { return len(b) }
func (b ByGeneration) Swap(i, j int) { b[i], b[j] = b[j], b[i] }

func init() {
middleware.RegisterOptions(storage.BlobDescriptorServiceFactory(&blobDescriptorServiceFactory{}))
}
Expand All @@ -25,6 +39,156 @@ type blobDescriptorService struct {
distribution.BlobDescriptorService
}

// Stat returns a a blob descriptor if the given blob is either linked in repository or is referenced in
// corresponding image stream. This method is invoked from inside of upstream's linkedBlobStore. It expects
// a proper repository object to be set on given context by upper openshift middleware wrappers.
func (bs *blobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
return dockerRegistry.BlobStatter().Stat(ctx, dgst)
repo, found := RepositoryFrom(ctx)
if !found || repo == nil {
err := fmt.Errorf("failed to retrieve repository from context")
context.GetLogger(ctx).Error(err)
return distribution.Descriptor{}, err
}

// if there is a repo layer link, return its descriptor
desc, err := bs.BlobDescriptorService.Stat(ctx, dgst)
if err == nil {
// and remember the association
repo.cachedLayers.RememberDigest(dgst, repo.blobrepositorycachettl, imageapi.DockerImageReference{
Namespace: repo.namespace,
Name: repo.name,
}.Exact())
return desc, nil
}

context.GetLogger(ctx).Debugf("could not stat layer link %q in repository %q: %v", dgst.String(), repo.Named().Name(), err)

// verify the blob is stored locally
desc, err = dockerRegistry.BlobStatter().Stat(ctx, dgst)
if err != nil {
return desc, err
}

if imageStreamHasBlob(repo, dgst) {
return desc, nil
}

return distribution.Descriptor{}, distribution.ErrBlobUnknown
}

func (bs *blobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
repo, found := RepositoryFrom(ctx)
if !found || repo == nil {
err := fmt.Errorf("failed to retrieve repository from context")
context.GetLogger(ctx).Error(err)
return err
}

repo.cachedLayers.ForgetDigest(dgst, imageapi.DockerImageReference{
Namespace: repo.namespace,
Name: repo.name,
}.Exact())
return bs.BlobDescriptorService.Clear(ctx, dgst)
}

// imageStreamHasBlob returns true if the given blob digest is referenced in image stream corresponding to
// given repository. If not found locally, image stream's images will be iterated and fetched from newest to
// oldest until found. Each processed image will update local cache of blobs.
func imageStreamHasBlob(r *repository, dgst digest.Digest) bool {
repositories := r.cachedLayers.RepositoriesForDigest(dgst)
match := imageapi.DockerImageReference{Namespace: r.namespace, Name: r.name}.Exact()
for _, repo := range repositories {
if repo == match {
context.GetLogger(r.ctx).Debugf("found cached blob %q in repository %s/%s", dgst.String(), r.namespace, r.name)
return true
}
}

// verify directly with etcd
is, err := r.getImageStream()
if err != nil {
context.GetLogger(r.ctx).Errorf("failed to get image stream: %v", err)
return false
}

tagEvents := []*imageapi.TagEvent{}
event2Name := make(map[*imageapi.TagEvent]string)
for name, eventList := range is.Status.Tags {
for i := range eventList.Items {
event := &eventList.Items[i]
tagEvents = append(tagEvents, event)
event2Name[event] = name
}
}
// search from youngest to oldest
sort.Sort(ByGeneration(tagEvents))

processedImages := map[string]struct{}{}

for _, tagEvent := range tagEvents {
if _, processed := processedImages[tagEvent.Image]; processed {
continue
}
if imageHasBlob(r, match, tagEvent.Image, dgst.String(), !r.pullthrough) {
tagName := event2Name[tagEvent]
context.GetLogger(r.ctx).Debugf("blob found under istag %s/%s:%s in image %s", r.namespace, r.name, tagName, tagEvent.Image)
return true
}
processedImages[tagEvent.Image] = struct{}{}
}

context.GetLogger(r.ctx).Warnf("blob %q exists locally but is not referenced in repository %s/%s", dgst.String(), r.namespace, r.name)

return false
}

// imageHasBlob returns true if the image identified by imageName refers to the given blob. The image is
// fetched. If requireManaged is true and the image is not managed (it refers to remote registry), the image
// will not be processed. Fetched image will update local cache of blobs -> repositories with (blobDigest,
// cacheName) pairs.
func imageHasBlob(
r *repository,
cacheName,
imageName,
blobDigest string,
requireManaged bool,
) bool {
image, err := r.getImage(digest.Digest(imageName))
if err != nil {
if kerrors.IsNotFound(err) {
context.GetLogger(r.ctx).Debugf("image %q not found: imageName")
} else {
context.GetLogger(r.ctx).Errorf("failed to get image: %v", err)
}
return false
}

// in case of pullthrough disabled, client won't be able to download a blob belonging to not managed image
// (image stored in external registry), thus don't consider them as candidates
if managed := image.Annotations[imageapi.ManagedByOpenShiftAnnotation]; requireManaged && managed != "true" {
context.GetLogger(r.ctx).Debugf("skipping not managed image")
return false
}

if len(image.DockerImageLayers) == 0 {
if len(image.DockerImageManifestMediaType) > 0 {
// image has no layers
return false
}
err = imageapi.ImageWithMetadata(image)
if err != nil {
context.GetLogger(r.ctx).Errorf("failed to get metadata for image %s: %v", imageName, err)
return false
}
}

for _, layer := range image.DockerImageLayers {
if layer.Name == blobDigest {
// remember all the layers of matching image
r.rememberLayersOfImage(image, cacheName)
return true
}
}

return false
}
Loading