diff --git a/images/dockerregistry/config.yml b/images/dockerregistry/config.yml index 90b3dfdc8174..4e1ef8b048ce 100644 --- a/images/dockerregistry/config.yml +++ b/images/dockerregistry/config.yml @@ -23,5 +23,6 @@ middleware: pullthrough: true enforcequota: false projectcachettl: 1m + blobrepositorycachettl: 10m storage: - name: openshift diff --git a/pkg/dockerregistry/server/auth.go b/pkg/dockerregistry/server/auth.go index 41f4e5f06003..552960db7cbc 100644 --- a/pkg/dockerregistry/server/auth.go +++ b/pkg/dockerregistry/server/auth.go @@ -40,26 +40,36 @@ const ( TokenRealmKey = "token-realm" ) +// RegistryClient encapsulates getting access to the OpenShift API. +type RegistryClient interface { + // Clients return the authenticated client to use with the server. + Clients() (client.Interface, kclient.Interface, error) + // SafeClientConfig returns a client config without authentication info. + SafeClientConfig() restclient.Config +} + // DefaultRegistryClient is exposed for testing the registry with fake client. var DefaultRegistryClient = NewRegistryClient(clientcmd.NewConfig().BindToFile()) -// RegistryClient encapsulates getting access to the OpenShift API. -type RegistryClient struct { +// registryClient implements RegistryClient +type registryClient struct { config *clientcmd.Config } +var _ RegistryClient = ®istryClient{} + // NewRegistryClient creates a registry client. -func NewRegistryClient(config *clientcmd.Config) *RegistryClient { - return &RegistryClient{config: config} +func NewRegistryClient(config *clientcmd.Config) RegistryClient { + return ®istryClient{config: config} } // Client returns the authenticated client to use with the server. -func (r *RegistryClient) Clients() (client.Interface, kclient.Interface, error) { +func (r *registryClient) Clients() (client.Interface, kclient.Interface, error) { return r.config.Clients() } // SafeClientConfig returns a client config without authentication info. -func (r *RegistryClient) SafeClientConfig() restclient.Config { +func (r *registryClient) SafeClientConfig() restclient.Config { return clientcmd.AnonymousClientConfig(r.config.OpenShiftConfig()) } @@ -327,22 +337,6 @@ func (ac *AccessController) Authorized(ctx context.Context, accessRecords ...reg return WithUserClient(ctx, osClient), nil } -func getNamespaceName(resourceName string) (string, string, error) { - repoParts := strings.SplitN(resourceName, "/", 2) - if len(repoParts) != 2 { - return "", "", ErrNamespaceRequired - } - ns := repoParts[0] - if len(ns) == 0 { - return "", "", ErrNamespaceRequired - } - name := repoParts[1] - if len(name) == 0 { - return "", "", ErrNamespaceRequired - } - return ns, name, nil -} - func getOpenShiftAPIToken(ctx context.Context, req *http.Request) (string, error) { token := "" diff --git a/pkg/dockerregistry/server/blobdescriptorservice.go b/pkg/dockerregistry/server/blobdescriptorservice.go index 7ace8d86fb11..b3e7048e1129 100644 --- a/pkg/dockerregistry/server/blobdescriptorservice.go +++ b/pkg/dockerregistry/server/blobdescriptorservice.go @@ -1,13 +1,28 @@ package server import ( + "fmt" + "sort" + "time" + "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/registry/middleware/registry" "github.com/docker/distribution/registry/storage" + + kerrors "k8s.io/kubernetes/pkg/api/errors" + + imageapi "github.com/openshift/origin/pkg/image/api" ) +// ByGeneration allows for sorting tag events from latest to oldest. +type ByGeneration []*imageapi.TagEvent + +func (b ByGeneration) Less(i, j int) bool { return b[i].Generation > b[j].Generation } +func (b ByGeneration) Len() int { return len(b) } +func (b ByGeneration) Swap(i, j int) { b[i], b[j] = b[j], b[i] } + func init() { middleware.RegisterOptions(storage.BlobDescriptorServiceFactory(&blobDescriptorServiceFactory{})) } @@ -25,6 +40,168 @@ type blobDescriptorService struct { distribution.BlobDescriptorService } +// Stat returns a a blob descriptor if the given blob is either linked in repository or is referenced in +// corresponding image stream. This method is invoked from inside of upstream's linkedBlobStore. It expects +// a proper repository object to be set on given context by upper openshift middleware wrappers. func (bs *blobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { - return dockerRegistry.BlobStatter().Stat(ctx, dgst) + repo, found := RepositoryFrom(ctx) + if !found || repo == nil { + err := fmt.Errorf("failed to retrieve repository from context") + context.GetLogger(ctx).Error(err) + return distribution.Descriptor{}, err + } + + // if there is a repo layer link, return its descriptor + desc, err := bs.BlobDescriptorService.Stat(ctx, dgst) + if err == nil { + // and remember the association + repo.cachedLayers.RememberDigest(dgst, repo.blobrepositorycachettl, imageapi.DockerImageReference{ + Namespace: repo.namespace, + Name: repo.name, + }.Exact()) + return desc, nil + } + + context.GetLogger(ctx).Debugf("could not stat layer link %q in repository %q: %v", dgst.String(), repo.Named().Name(), err) + + // verify the blob is stored locally + desc, err = dockerRegistry.BlobStatter().Stat(ctx, dgst) + if err != nil { + return desc, err + } + + // ensure it's referenced inside of corresponding image stream + if imageStreamHasBlob(repo, dgst) { + return desc, nil + } + + return distribution.Descriptor{}, distribution.ErrBlobUnknown +} + +func (bs *blobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error { + repo, found := RepositoryFrom(ctx) + if !found || repo == nil { + err := fmt.Errorf("failed to retrieve repository from context") + context.GetLogger(ctx).Error(err) + return err + } + + repo.cachedLayers.ForgetDigest(dgst, imageapi.DockerImageReference{ + Namespace: repo.namespace, + Name: repo.name, + }.Exact()) + return bs.BlobDescriptorService.Clear(ctx, dgst) +} + +// imageStreamHasBlob returns true if the given blob digest is referenced in image stream corresponding to +// given repository. If not found locally, image stream's images will be iterated and fetched from newest to +// oldest until found. Each processed image will update local cache of blobs. +func imageStreamHasBlob(r *repository, dgst digest.Digest) bool { + repoCacheName := imageapi.DockerImageReference{Namespace: r.namespace, Name: r.name}.Exact() + if r.cachedLayers.RepositoryHasBlob(repoCacheName, dgst) { + context.GetLogger(r.ctx).Debugf("found cached blob %q in repository %s", dgst.String(), r.Named().Name()) + return true + } + + context.GetLogger(r.ctx).Debugf("verifying presence of blob %q in image stream %s/%s", dgst.String(), r.namespace, r.name) + started := time.Now() + logFound := func(found bool) bool { + elapsed := time.Now().Sub(started) + if found { + context.GetLogger(r.ctx).Debugf("verified presence of blob %q in image stream %s/%s after %s", dgst.String(), r.namespace, r.name, elapsed.String()) + } else { + context.GetLogger(r.ctx).Debugf("detected absence of blob %q in image stream %s/%s after %s", dgst.String(), r.namespace, r.name, elapsed.String()) + } + return found + } + + // verify directly with etcd + is, err := r.getImageStream() + if err != nil { + context.GetLogger(r.ctx).Errorf("failed to get image stream: %v", err) + return logFound(false) + } + + tagEvents := []*imageapi.TagEvent{} + event2Name := make(map[*imageapi.TagEvent]string) + for name, eventList := range is.Status.Tags { + for i := range eventList.Items { + event := &eventList.Items[i] + tagEvents = append(tagEvents, event) + event2Name[event] = name + } + } + // search from youngest to oldest + sort.Sort(ByGeneration(tagEvents)) + + processedImages := map[string]struct{}{} + + for _, tagEvent := range tagEvents { + if _, processed := processedImages[tagEvent.Image]; processed { + continue + } + if imageHasBlob(r, repoCacheName, tagEvent.Image, dgst.String(), !r.pullthrough) { + tagName := event2Name[tagEvent] + context.GetLogger(r.ctx).Debugf("blob found under istag %s/%s:%s in image %s", r.namespace, r.name, tagName, tagEvent.Image) + return logFound(true) + } + processedImages[tagEvent.Image] = struct{}{} + } + + context.GetLogger(r.ctx).Warnf("blob %q exists locally but is not referenced in repository %s/%s", dgst.String(), r.namespace, r.name) + + return logFound(false) +} + +// imageHasBlob returns true if the image identified by imageName refers to the given blob. The image is +// fetched. If requireManaged is true and the image is not managed (it refers to remote registry), the image +// will not be processed. Fetched image will update local cache of blobs -> repositories with (blobDigest, +// cacheName) pairs. +func imageHasBlob( + r *repository, + cacheName, + imageName, + blobDigest string, + requireManaged bool, +) bool { + context.GetLogger(r.ctx).Debugf("getting image %s", imageName) + image, err := r.getImage(digest.Digest(imageName)) + if err != nil { + if kerrors.IsNotFound(err) { + context.GetLogger(r.ctx).Debugf("image %q not found: imageName") + } else { + context.GetLogger(r.ctx).Errorf("failed to get image: %v", err) + } + return false + } + + // in case of pullthrough disabled, client won't be able to download a blob belonging to not managed image + // (image stored in external registry), thus don't consider them as candidates + if managed := image.Annotations[imageapi.ManagedByOpenShiftAnnotation]; requireManaged && managed != "true" { + context.GetLogger(r.ctx).Debugf("skipping not managed image") + return false + } + + if len(image.DockerImageLayers) == 0 { + if len(image.DockerImageManifestMediaType) > 0 { + // If the media type is set, we can safely assume that the best effort to fill the image layers + // has already been done. There are none. + return false + } + err = imageapi.ImageWithMetadata(image) + if err != nil { + context.GetLogger(r.ctx).Errorf("failed to get metadata for image %s: %v", imageName, err) + return false + } + } + + for _, layer := range image.DockerImageLayers { + if layer.Name == blobDigest { + // remember all the layers of matching image + r.rememberLayersOfImage(image, cacheName) + return true + } + } + + return false } diff --git a/pkg/dockerregistry/server/blobdescriptorservice_test.go b/pkg/dockerregistry/server/blobdescriptorservice_test.go new file mode 100644 index 000000000000..e20f4af2f96f --- /dev/null +++ b/pkg/dockerregistry/server/blobdescriptorservice_test.go @@ -0,0 +1,495 @@ +package server + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "os" + "sync" + "testing" + "time" + + "github.com/docker/distribution" + "github.com/docker/distribution/configuration" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/api/errcode" + "github.com/docker/distribution/registry/api/v2" + registryauth "github.com/docker/distribution/registry/auth" + "github.com/docker/distribution/registry/handlers" + "github.com/docker/distribution/registry/middleware/registry" + "github.com/docker/distribution/registry/storage" + + registrytest "github.com/openshift/origin/pkg/dockerregistry/testutil" + "k8s.io/kubernetes/pkg/client/restclient" + kclient "k8s.io/kubernetes/pkg/client/unversioned" + ktestclient "k8s.io/kubernetes/pkg/client/unversioned/testclient" + + osclient "github.com/openshift/origin/pkg/client" + "github.com/openshift/origin/pkg/client/testclient" + imagetest "github.com/openshift/origin/pkg/image/admission/testutil" +) + +// TestBlobDescriptorServiceIsApplied ensures that blobDescriptorService middleware gets applied. +// It relies on the fact that blobDescriptorService requires higher levels to set repository object on given +// context. If the object isn't given, its method will err out. +func TestBlobDescriptorServiceIsApplied(t *testing.T) { + ctx := context.Background() + + // don't do any authorization check + installFakeAccessController(t) + m := fakeBlobDescriptorService(t) + // to make other unit tests working + defer m.changeUnsetRepository(false) + + testImage, err := registrytest.NewImageForManifest("user/app", registrytest.SampleImageManifestSchema1, true) + if err != nil { + t.Fatal(err) + } + testImageStream := testNewImageStreamObject("user", "app", "latest", testImage.Name) + client := &testclient.Fake{} + client.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, *testImageStream)) + client.AddReactor("get", "images", getFakeImageGetHandler(t, *testImage)) + + // TODO: get rid of those nasty global vars + backupRegistryClient := DefaultRegistryClient + DefaultRegistryClient = makeFakeRegistryClient(client, ktestclient.NewSimpleFake()) + defer func() { + // set it back once this test finishes to make other unit tests working + DefaultRegistryClient = backupRegistryClient + }() + + app := handlers.NewApp(ctx, &configuration.Configuration{ + Loglevel: "debug", + Auth: map[string]configuration.Parameters{ + fakeAuthorizerName: {"realm": fakeAuthorizerName}, + }, + Storage: configuration.Storage{ + "inmemory": configuration.Parameters{}, + "cache": configuration.Parameters{ + "blobdescriptor": "inmemory", + }, + "delete": configuration.Parameters{ + "enabled": true, + }, + }, + Middleware: map[string][]configuration.Middleware{ + "registry": {{Name: "openshift"}}, + "repository": {{Name: "openshift"}}, + "storage": {{Name: "openshift"}}, + }, + }) + server := httptest.NewServer(app) + router := v2.Router() + + serverURL, err := url.Parse(server.URL) + if err != nil { + t.Fatalf("error parsing server url: %v", err) + } + os.Setenv("DOCKER_REGISTRY_URL", serverURL.Host) + + desc, _, err := registrytest.UploadTestBlob(serverURL, "user/app") + if err != nil { + t.Fatal(err) + } + + for _, tc := range []struct { + name string + method string + endpoint string + vars []string + unsetRepository bool + expectedStatus int + expectedMethodInvocations map[string]int + }{ + { + name: "get blob with repository unset", + method: http.MethodGet, + endpoint: v2.RouteNameBlob, + vars: []string{ + "name", "user/app", + "digest", desc.Digest.String(), + }, + unsetRepository: true, + expectedStatus: http.StatusInternalServerError, + expectedMethodInvocations: map[string]int{"Stat": 1}, + }, + + { + name: "get blob", + method: http.MethodGet, + endpoint: v2.RouteNameBlob, + vars: []string{ + "name", "user/app", + "digest", desc.Digest.String(), + }, + expectedStatus: http.StatusOK, + // 1st stat is invoked in (*distribution/registry/handlers.blobHandler).GetBlob() as a + // check of blob existence + // 2nd stat happens in (*errorBlobStore).ServeBlob() invoked by the same GetBlob handler + // 3rd stat is done by (*blobServiceListener).ServeBlob once the blob serving is finished; + // it may happen with a slight delay after the blob was served + expectedMethodInvocations: map[string]int{"Stat": 3}, + }, + + { + name: "stat blob with repository unset", + method: http.MethodHead, + endpoint: v2.RouteNameBlob, + vars: []string{ + "name", "user/app", + "digest", desc.Digest.String(), + }, + unsetRepository: true, + expectedStatus: http.StatusInternalServerError, + expectedMethodInvocations: map[string]int{"Stat": 1}, + }, + + { + name: "stat blob", + method: http.MethodHead, + endpoint: v2.RouteNameBlob, + vars: []string{ + "name", "user/app", + "digest", desc.Digest.String(), + }, + expectedStatus: http.StatusOK, + // 1st stat is invoked in (*distribution/registry/handlers.blobHandler).GetBlob() as a + // check of blob existence + // 2nd stat happens in (*errorBlobStore).ServeBlob() invoked by the same GetBlob handler + // 3rd stat is done by (*blobServiceListener).ServeBlob once the blob serving is finished; + // it may happen with a slight delay after the blob was served + expectedMethodInvocations: map[string]int{"Stat": 3}, + }, + + { + name: "delete blob with repository unset", + method: http.MethodDelete, + endpoint: v2.RouteNameBlob, + vars: []string{ + "name", "user/app", + "digest", desc.Digest.String(), + }, + unsetRepository: true, + expectedStatus: http.StatusInternalServerError, + expectedMethodInvocations: map[string]int{"Stat": 1}, + }, + + { + name: "delete blob", + method: http.MethodDelete, + endpoint: v2.RouteNameBlob, + vars: []string{ + "name", "user/app", + "digest", desc.Digest.String(), + }, + expectedStatus: http.StatusAccepted, + expectedMethodInvocations: map[string]int{"Stat": 1, "Clear": 1}, + }, + + { + name: "get manifest with repository unset", + method: http.MethodGet, + endpoint: v2.RouteNameManifest, + vars: []string{ + "name", "user/app", + "reference", "latest", + }, + unsetRepository: true, + // succeed because blob store is not involved + expectedStatus: http.StatusOK, + // manifest is retrieved from etcd + expectedMethodInvocations: map[string]int{"Stat": 0}, + }, + + { + name: "get manifest", + method: http.MethodGet, + endpoint: v2.RouteNameManifest, + vars: []string{ + "name", "user/app", + "reference", "latest", + }, + expectedStatus: http.StatusOK, + // manifest is retrieved from etcd + expectedMethodInvocations: map[string]int{"Stat": 0}, + }, + + { + name: "delete manifest with repository unset", + method: http.MethodDelete, + endpoint: v2.RouteNameManifest, + vars: []string{ + "name", "user/app", + "reference", testImage.Name, + }, + unsetRepository: true, + expectedStatus: http.StatusInternalServerError, + // we don't allow to delete manifests from etcd; in this case, we attempt to delete layer link + expectedMethodInvocations: map[string]int{"Stat": 1}, + }, + + { + name: "delete manifest", + method: http.MethodDelete, + endpoint: v2.RouteNameManifest, + vars: []string{ + "name", "user/app", + "reference", testImage.Name, + }, + expectedStatus: http.StatusNotFound, + // we don't allow to delete manifests from etcd; in this case, we attempt to delete layer link + expectedMethodInvocations: map[string]int{"Stat": 1}, + }, + } { + m.clearStats() + m.changeUnsetRepository(tc.unsetRepository) + + route := router.GetRoute(tc.endpoint).Host(serverURL.Host) + u, err := route.URL(tc.vars...) + if err != nil { + t.Errorf("[%s] failed to build route: %v", tc.name, err) + continue + } + + req, err := http.NewRequest(tc.method, u.String(), nil) + if err != nil { + t.Errorf("[%s] failed to make request: %v", tc.name, err) + } + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + t.Errorf("[%s] failed to do the request: %v", tc.name, err) + continue + } + defer resp.Body.Close() + + if resp.StatusCode != tc.expectedStatus { + t.Errorf("[%s] unexpected status code: %v != %v", tc.name, resp.StatusCode, tc.expectedStatus) + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + content, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Errorf("[%s] failed to read body: %v", tc.name, err) + } else if len(content) > 0 { + errs := errcode.Errors{} + err := errs.UnmarshalJSON(content) + if err != nil { + t.Logf("[%s] failed to parse body as error: %v", tc.name, err) + t.Logf("[%s] received body: %v", tc.name, string(content)) + } else { + t.Logf("[%s] received errors: %#+v", tc.name, errs) + } + } + } + + stats, err := m.getStats(tc.expectedMethodInvocations, time.Second*5) + if err != nil { + t.Errorf("[%s] failed to get stats: %v", tc.name, err) + } + for method, exp := range tc.expectedMethodInvocations { + invoked := stats[method] + if invoked != exp { + t.Errorf("[%s] unexpected number of invocations of method %q: %v != %v", tc.name, method, invoked, exp) + } + } + for method, invoked := range stats { + if _, ok := tc.expectedMethodInvocations[method]; !ok { + t.Errorf("[%s] unexpected method %q invoked %d times", tc.name, method, invoked) + } + } + } +} + +type testBlobDescriptorManager struct { + mu sync.Mutex + cond *sync.Cond + stats map[string]int + unsetRepository bool +} + +// NewTestBlobDescriptorManager allows to control blobDescriptorService and collects statistics of called +// methods. +func NewTestBlobDescriptorManager() *testBlobDescriptorManager { + m := &testBlobDescriptorManager{ + stats: make(map[string]int), + } + m.cond = sync.NewCond(&m.mu) + return m +} + +func (m *testBlobDescriptorManager) clearStats() { + m.mu.Lock() + defer m.mu.Unlock() + + for k := range m.stats { + delete(m.stats, k) + } +} + +func (m *testBlobDescriptorManager) methodInvoked(methodName string) int { + m.mu.Lock() + defer m.mu.Unlock() + + newCount := m.stats[methodName] + 1 + m.stats[methodName] = newCount + m.cond.Signal() + + return newCount +} + +// unsetRepository returns true if the testBlobDescriptorService should unset repository from context before +// passing down the call +func (m *testBlobDescriptorManager) getUnsetRepository() bool { + m.mu.Lock() + defer m.mu.Unlock() + + return m.unsetRepository +} + +// changeUnsetRepository allows to configure whether the testBlobDescriptorService should unset repository +// from context before passing down the call +func (m *testBlobDescriptorManager) changeUnsetRepository(unset bool) { + m.mu.Lock() + defer m.mu.Unlock() + + m.unsetRepository = unset +} + +// getStats waits until blob descriptor service's methods are called specified number of times and returns +// collected numbers of invocations per each method watched. An error will be returned if a given timeout is +// reached without satisfying minimum limit.s +func (m *testBlobDescriptorManager) getStats(minimumLimits map[string]int, timeout time.Duration) (map[string]int, error) { + m.mu.Lock() + defer m.mu.Unlock() + + var err error + end := time.Now().Add(timeout) + + if len(minimumLimits) > 0 { + Loop: + for !statsGreaterThanOrEqual(m.stats, minimumLimits) { + c := make(chan struct{}) + go func() { m.cond.Wait(); c <- struct{}{} }() + + now := time.Now() + select { + case <-time.After(end.Sub(now)): + err = fmt.Errorf("timeout while waiting on expected stats") + break Loop + case <-c: + continue Loop + } + } + } + + stats := make(map[string]int) + for k, v := range m.stats { + stats[k] = v + } + + return stats, err +} + +func statsGreaterThanOrEqual(stats, minimumLimits map[string]int) bool { + for key, val := range minimumLimits { + if val > stats[key] { + return false + } + } + return true +} + +// fakeBlobDescriptorService installs a fake blob descriptor on top of blobDescriptorService that collects +// stats of method invocations. unsetRepository commands the controller to remove repository object from +// context passed down to blobDescriptorService if true. +func fakeBlobDescriptorService(t *testing.T) *testBlobDescriptorManager { + m := NewTestBlobDescriptorManager() + middleware.RegisterOptions(storage.BlobDescriptorServiceFactory(&testBlobDescriptorServiceFactory{t: t, m: m})) + return m +} + +type testBlobDescriptorServiceFactory struct { + t *testing.T + m *testBlobDescriptorManager +} + +func (bf *testBlobDescriptorServiceFactory) BlobAccessController(svc distribution.BlobDescriptorService) distribution.BlobDescriptorService { + if _, ok := svc.(*blobDescriptorService); !ok { + svc = (&blobDescriptorServiceFactory{}).BlobAccessController(svc) + } + return &testBlobDescriptorService{BlobDescriptorService: svc, t: bf.t, m: bf.m} +} + +type testBlobDescriptorService struct { + distribution.BlobDescriptorService + t *testing.T + m *testBlobDescriptorManager +} + +func (bs *testBlobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + bs.m.methodInvoked("Stat") + if bs.m.getUnsetRepository() { + bs.t.Logf("unsetting repository from the context") + ctx = WithRepository(ctx, nil) + } + + return bs.BlobDescriptorService.Stat(ctx, dgst) +} +func (bs *testBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error { + bs.m.methodInvoked("Clear") + if bs.m.getUnsetRepository() { + bs.t.Logf("unsetting repository from the context") + ctx = WithRepository(ctx, nil) + } + return bs.BlobDescriptorService.Clear(ctx, dgst) +} + +const fakeAuthorizerName = "fake" + +// installFakeAccessController installs an authorizer that allows access anywhere to anybody. +func installFakeAccessController(t *testing.T) { + registryauth.Register(fakeAuthorizerName, registryauth.InitFunc( + func(options map[string]interface{}) (registryauth.AccessController, error) { + t.Log("instantiating fake access controller") + return &fakeAccessController{t: t}, nil + })) +} + +type fakeAccessController struct { + t *testing.T +} + +var _ registryauth.AccessController = &fakeAccessController{} + +func (f *fakeAccessController) Authorized(ctx context.Context, access ...registryauth.Access) (context.Context, error) { + for _, access := range access { + f.t.Logf("fake authorizer: authorizing access to %s:%s:%s", access.Resource.Type, access.Resource.Name, access.Action) + } + + ctx = WithAuthPerformed(ctx) + return ctx, nil +} + +func makeFakeRegistryClient(client osclient.Interface, kClient kclient.Interface) RegistryClient { + return &fakeRegistryClient{ + client: client, + kClient: kClient, + } +} + +type fakeRegistryClient struct { + client osclient.Interface + kClient kclient.Interface +} + +func (f *fakeRegistryClient) Clients() (osclient.Interface, kclient.Interface, error) { + return f.client, f.kClient, nil +} +func (f *fakeRegistryClient) SafeClientConfig() restclient.Config { + return (®istryClient{}).SafeClientConfig() +} diff --git a/pkg/dockerregistry/server/digestcache.go b/pkg/dockerregistry/server/digestcache.go index 8b1c946b812a..486275de98fc 100644 --- a/pkg/dockerregistry/server/digestcache.go +++ b/pkg/dockerregistry/server/digestcache.go @@ -2,10 +2,13 @@ package server import ( "sync" + "time" "github.com/hashicorp/golang-lru" "github.com/docker/distribution/digest" + + "k8s.io/kubernetes/pkg/util" ) // digestToRepositoryCache maps image digests to recently seen remote repositories that @@ -13,6 +16,7 @@ import ( // push old repositories out. type digestToRepositoryCache struct { *lru.Cache + clock util.Clock } // newDigestToRepositoryCache creates a new LRU cache of image digests to possible remote @@ -23,23 +27,42 @@ func newDigestToRepositoryCache(size int) (digestToRepositoryCache, error) { if err != nil { return digestToRepositoryCache{}, err } - return digestToRepositoryCache{Cache: c}, nil + return digestToRepositoryCache{ + Cache: c, + clock: &util.RealClock{}, + }, nil } -const bucketSize = 10 +const bucketSize = 16 // RememberDigest associates a digest with a repository. -func (c digestToRepositoryCache) RememberDigest(dgst digest.Digest, repo string) { +func (c digestToRepositoryCache) RememberDigest(dgst digest.Digest, ttl time.Duration, repo string) { key := dgst.String() value, ok := c.Get(key) if !ok { - value = &repositoryBucket{} - if ok, _ := c.ContainsOrAdd(key, value); !ok { - return + value = &repositoryBucket{clock: c.clock} + if ok, _ := c.ContainsOrAdd(key, value); ok { + // the value exists now, get it + value, ok = c.Get(key) + if !ok { + // should not happen + return + } } } repos := value.(*repositoryBucket) - repos.Add(repo) + repos.Add(ttl, repo) +} + +// ForgetDigest removes an association between given digest and repository from the cache. +func (c digestToRepositoryCache) ForgetDigest(dgst digest.Digest, repo string) { + key := dgst.String() + value, ok := c.Peek(key) + if !ok { + return + } + repos := value.(*repositoryBucket) + repos.Remove(repo) } // RepositoriesForDigest returns a list of repositories that may contain this digest. @@ -52,42 +75,125 @@ func (c digestToRepositoryCache) RepositoriesForDigest(dgst digest.Digest) []str return repos.Copy() } +func (c digestToRepositoryCache) RepositoryHasBlob(repo string, dgst digest.Digest) bool { + value, ok := c.Get(dgst.String()) + if !ok { + return false + } + repos := value.(*repositoryBucket) + return repos.Has(repo) +} + +// repositoryBucket contains a list of repositories with eviction timeouts. type repositoryBucket struct { - mu sync.Mutex - list []string + mu sync.Mutex + clock util.Clock + list []bucketEntry } // Has returns true if the bucket contains this repository. -func (i *repositoryBucket) Has(repo string) bool { - i.mu.Lock() - defer i.mu.Unlock() - for _, s := range i.list { - if s == repo { - return true +func (b *repositoryBucket) Has(repo string) bool { + b.mu.Lock() + defer b.mu.Unlock() + + b.evictStale() + + return b.getIndexOf(repo) >= 0 +} + +// Add one or more repositories to this bucket. +func (b *repositoryBucket) Add(ttl time.Duration, repos ...string) { + b.mu.Lock() + defer b.mu.Unlock() + + b.evictStale() + evictOn := b.clock.Now().Add(ttl) + + for _, repo := range repos { + index := b.getIndexOf(repo) + arr := b.list + + if index >= 0 { + // repository already exists, move it to the end with highest eviction time + entry := arr[index] + copy(arr[index:], arr[index+1:]) + if entry.evictOn.Before(evictOn) { + entry.evictOn = evictOn + } + arr[len(arr)-1] = entry + + } else { + // repo is a new entry + if len(arr) >= bucketSize { + copy(arr, arr[1:]) + arr = arr[:bucketSize-1] + } + arr = append(arr, bucketEntry{ + repository: repo, + evictOn: evictOn, + }) } + b.list = arr } - return false } -// Add one or more repositories to this bucket. -func (i *repositoryBucket) Add(repos ...string) { - i.mu.Lock() - defer i.mu.Unlock() - arr := i.list +// Remove removes all the given repos from repository bucket. +func (b *repositoryBucket) Remove(repos ...string) { + b.mu.Lock() + defer b.mu.Unlock() + for _, repo := range repos { - if len(arr) >= bucketSize { - arr = arr[1:] + index := b.getIndexOf(repo) + + if index >= 0 { + copy(b.list[index:], b.list[index+1:]) + b.list = b.list[:len(b.list)-1] } - arr = append(arr, repo) } - i.list = arr } -// Copy returns a copy of the contents of this bucket in a threadsafe fasion. -func (i *repositoryBucket) Copy() []string { - i.mu.Lock() - defer i.mu.Unlock() - out := make([]string, len(i.list)) - copy(out, i.list) +// getIndexOf returns an index of given repository in bucket's array. If not found, -1 will be returned. +func (b *repositoryBucket) getIndexOf(repo string) int { + for i, entry := range b.list { + if entry.repository == repo { + return i + } + } + + return -1 +} + +// evictStale removes stale entries from the list and shifts all the survivalists to the front. +func (b *repositoryBucket) evictStale() { + now := b.clock.Now() + arr := b.list[:0] + + for _, entry := range b.list { + if entry.evictOn.Before(now) { + continue + } + arr = append(arr, entry) + } + + b.list = arr +} + +// Copy returns a copy of the contents of this bucket in a thread-safe fashion. +func (b *repositoryBucket) Copy() []string { + b.mu.Lock() + defer b.mu.Unlock() + + b.evictStale() + + out := make([]string, len(b.list)) + for i, e := range b.list { + out[i] = e.repository + } return out } + +// bucketEntry holds a repository name with eviction timeout. +type bucketEntry struct { + repository string + evictOn time.Time +} diff --git a/pkg/dockerregistry/server/digestcache_test.go b/pkg/dockerregistry/server/digestcache_test.go new file mode 100644 index 000000000000..796158c4b7a1 --- /dev/null +++ b/pkg/dockerregistry/server/digestcache_test.go @@ -0,0 +1,621 @@ +package server + +import ( + "fmt" + "reflect" + "testing" + "time" + + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/diff" +) + +const ( + allowedDeviation = time.Millisecond * 10 + + ttl1m = time.Minute + ttl5m = time.Minute * 5 + ttl8m = time.Minute * 8 +) + +func TestRepositoryBucketAdd(t *testing.T) { + now := time.Now() + clock := util.NewFakeClock(now) + + generated := make([]bucketEntry, bucketSize) + for i := 0; i < bucketSize; i++ { + generated[i] = bucketEntry{ + repository: fmt.Sprintf("gen%d", i), + evictOn: now.Add(ttl5m), + } + } + + for _, tc := range []struct { + name string + ttl time.Duration + repos []string + entries []bucketEntry + expectedEntries []bucketEntry + }{ + { + name: "no existing entries", + ttl: ttl5m, + repos: []string{"a", "b"}, + expectedEntries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "no entries to add", + ttl: ttl5m, + entries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "add few new entries", + ttl: ttl8m, + repos: []string{"bmw", "audi"}, + entries: []bucketEntry{ + { + repository: "skoda", + evictOn: now.Add(ttl5m), + }, + { + repository: "ford", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "skoda", + evictOn: now.Add(ttl5m), + }, + { + repository: "ford", + evictOn: now.Add(ttl5m), + }, + { + repository: "bmw", + evictOn: now.Add(ttl8m), + }, + { + repository: "audi", + evictOn: now.Add(ttl8m), + }, + }, + }, + + { + name: "add existing entry with single item", + ttl: ttl8m, + repos: []string{"apple"}, + entries: []bucketEntry{ + {repository: "apple", evictOn: now.Add(ttl5m)}, + }, + expectedEntries: []bucketEntry{ + {repository: "apple", evictOn: now.Add(ttl8m)}, + }, + }, + + { + name: "add existing entry with higher ttl", + ttl: ttl8m, + repos: []string{"apple"}, + entries: []bucketEntry{ + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + { + repository: "apple", + evictOn: now.Add(ttl8m), + }, + }, + }, + + { + name: "add existing entry with lower ttl", + ttl: ttl5m, + repos: []string{"orange"}, + entries: []bucketEntry{ + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + }, + }, + + { + name: "add new entry with eviction", + ttl: ttl5m, + repos: []string{"banana"}, + entries: []bucketEntry{ + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "apple", + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + { + repository: "banana", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "all stale", + ttl: ttl5m, + repos: []string{"banana"}, + entries: []bucketEntry{ + { + repository: "orange", + }, + { + repository: "apple", + }, + { + repository: "pear", + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "banana", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "add multiple entries with middle ttl", + ttl: ttl5m, + repos: []string{"apple", "banana", "peach", "orange"}, + entries: []bucketEntry{ + { + repository: "melon", + }, + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "pear", + evictOn: now.Add(ttl1m), + }, + { + repository: "plum", + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "pear", + evictOn: now.Add(ttl1m), + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "banana", + evictOn: now.Add(ttl5m), + }, + { + repository: "peach", + evictOn: now.Add(ttl5m), + }, + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + }, + }, + + { + name: "over bucket size", + ttl: ttl1m, + repos: []string{"new1", generated[2].repository, "new2", generated[4].repository}, + entries: generated, + expectedEntries: append( + append([]bucketEntry{generated[3]}, generated[5:bucketSize]...), + bucketEntry{repository: "new1", evictOn: now.Add(ttl1m)}, + generated[2], + bucketEntry{repository: "new2", evictOn: now.Add(ttl1m)}, + generated[4]), + }, + } { + b := repositoryBucket{ + clock: clock, + list: tc.entries, + } + b.Add(tc.ttl, tc.repos...) + + if len(b.list) != len(tc.expectedEntries) { + t.Errorf("[%s] got unexpected number of entries in bucket: %d != %d", tc.name, len(b.list), len(tc.expectedEntries)) + } + for i := 0; i < len(b.list); i++ { + if i >= len(tc.expectedEntries) { + t.Errorf("[%s] index=%d got unexpected entry: %#+v", tc.name, i, b.list[i]) + continue + } + a, b := b.list[i], tc.expectedEntries[i] + if !bucketEntriesEqual(a, b) { + t.Errorf("[%s] index=%d got unexpected entry: %#+v != %#+v", tc.name, i, a, b) + } + } + for i := len(b.list); i < len(tc.expectedEntries); i++ { + if i >= len(tc.expectedEntries) { + t.Errorf("[%s] index=%d missing expected entry %#+v", tc.name, i, tc.expectedEntries[i]) + } + } + } +} + +func TestRepositoryBucketAddOversize(t *testing.T) { + clock := util.NewFakeClock(time.Now()) + + b := repositoryBucket{ + clock: clock, + } + + i := 0 + for ; i < bucketSize; i++ { + ttl := time.Duration(uint64(ttl5m) * uint64(i)) + b.Add(ttl, fmt.Sprintf("%d", i)) + } + if len(b.list) != bucketSize { + t.Fatalf("unexpected number of items: %d != %d", len(b.list), bucketSize) + } + + // make first three stale + clock.Step(ttl5m * 3) + if !b.Has("3") { + t.Fatalf("bucket does not contain repository 3") + } + if len(b.list) != bucketSize-3 { + t.Fatalf("unexpected number of items: %d != %d", len(b.list), bucketSize-3) + } + + // add few repos one by one + for ; i < bucketSize+5; i++ { + ttl := time.Duration(uint64(ttl5m) * uint64(i)) + b.Add(ttl, fmt.Sprintf("%d", i)) + } + if len(b.list) != bucketSize { + t.Fatalf("unexpected number of items: %d != %d", len(b.list), bucketSize) + } + + // add few repos at once + newRepos := []string{} + for ; i < bucketSize+10; i++ { + newRepos = append(newRepos, fmt.Sprintf("%d", i)) + } + b.Add(ttl5m, newRepos...) + if len(b.list) != bucketSize { + t.Fatalf("unexpected number of items: %d != %d", len(b.list), bucketSize) + } + + for j := 0; j < bucketSize; j++ { + expected := fmt.Sprintf("%d", i-bucketSize+j) + if b.list[j].repository != expected { + t.Fatalf("unexpected repository on index %d: %s != %s", j, b.list[j].repository, expected) + } + } +} + +func TestRepositoryBucketRemove(t *testing.T) { + now := time.Now() + clock := util.NewFakeClock(now) + + for _, tc := range []struct { + name string + repos []string + entries []bucketEntry + expectedEntries []bucketEntry + }{ + { + name: "no existing entries", + repos: []string{"a", "b"}, + }, + + { + name: "no matching entries", + repos: []string{"c", "d"}, + entries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "no entries to remove", + entries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "remove one matching", + repos: []string{"bmw", "skoda"}, + entries: []bucketEntry{ + { + repository: "skoda", + evictOn: now.Add(ttl5m), + }, + { + repository: "ford", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "ford", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "remove existing entry with single item", + repos: []string{"apple"}, + entries: []bucketEntry{ + {repository: "apple", evictOn: now.Add(ttl5m)}, + }, + expectedEntries: []bucketEntry{}, + }, + + { + name: "remove, no eviction", + repos: []string{"pear"}, + entries: []bucketEntry{ + { + repository: "orange", + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "orange", + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "remove multiple matching", + repos: []string{"orange", "apple"}, + entries: []bucketEntry{ + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + }, + }, + } { + b := repositoryBucket{ + clock: clock, + list: tc.entries, + } + b.Remove(tc.repos...) + + if len(b.list) != len(tc.expectedEntries) { + t.Errorf("[%s] got unexpected number of entries in bucket: %d != %d", tc.name, len(b.list), len(tc.expectedEntries)) + } + for i := 0; i < len(b.list); i++ { + if i >= len(tc.expectedEntries) { + t.Errorf("[%s] index=%d got unexpected entry: %#+v", tc.name, i, b.list[i]) + continue + } + a, b := b.list[i], tc.expectedEntries[i] + if !bucketEntriesEqual(a, b) { + t.Errorf("[%s] index=%d got unexpected entry: %#+v != %#+v", tc.name, i, a, b) + } + } + for i := len(b.list); i < len(tc.expectedEntries); i++ { + if i >= len(tc.expectedEntries) { + t.Errorf("[%s] index=%d missing expected entry %#+v", tc.name, i, tc.expectedEntries[i]) + } + } + } +} + +func TestRepositoryBucketCopy(t *testing.T) { + now := time.Now() + clock := util.NewFakeClock(now) + + ttl5m := time.Minute * 5 + for _, tc := range []struct { + name string + entries []bucketEntry + expectedRepos []string + }{ + { + name: "no entry", + expectedRepos: []string{}, + }, + + { + name: "one stale entry", + entries: []bucketEntry{ + { + repository: "1", + }, + }, + expectedRepos: []string{}, + }, + + { + name: "two entries", + entries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + expectedRepos: []string{"a", "b"}, + }, + } { + b := repositoryBucket{ + clock: clock, + list: tc.entries, + } + result := b.Copy() + + if !reflect.DeepEqual(result, tc.expectedRepos) { + t.Errorf("[%s] got unexpected repo list: %s", tc.name, diff.ObjectGoPrintDiff(result, tc.expectedRepos)) + } + } +} + +func bucketEntriesEqual(a, b bucketEntry) bool { + if a.repository != b.repository || a.evictOn != b.evictOn { + return false + } + return true +} diff --git a/pkg/dockerregistry/server/errorblobstore.go b/pkg/dockerregistry/server/errorblobstore.go index b1627d09c6fe..dfee01828c07 100644 --- a/pkg/dockerregistry/server/errorblobstore.go +++ b/pkg/dockerregistry/server/errorblobstore.go @@ -7,7 +7,7 @@ import ( "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" - "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/reference" ) // errorBlobStore wraps a distribution.BlobStore for a particular repo. @@ -23,28 +23,28 @@ func (r *errorBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribu if err := r.repo.checkPendingErrors(ctx); err != nil { return distribution.Descriptor{}, err } - return r.store.Stat(ctx, dgst) + return r.store.Stat(WithRepository(ctx, r.repo), dgst) } func (r *errorBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { if err := r.repo.checkPendingErrors(ctx); err != nil { return nil, err } - return r.store.Get(ctx, dgst) + return r.store.Get(WithRepository(ctx, r.repo), dgst) } func (r *errorBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { if err := r.repo.checkPendingErrors(ctx); err != nil { return nil, err } - return r.store.Open(ctx, dgst) + return r.store.Open(WithRepository(ctx, r.repo), dgst) } func (r *errorBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { if err := r.repo.checkPendingErrors(ctx); err != nil { return distribution.Descriptor{}, err } - return r.store.Put(ctx, mediaType, p) + return r.store.Put(WithRepository(ctx, r.repo), mediaType, p) } func (r *errorBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { @@ -52,15 +52,25 @@ func (r *errorBlobStore) Create(ctx context.Context, options ...distribution.Blo return nil, err } + ctx = WithRepository(ctx, r.repo) + opts, err := effectiveCreateOptions(options) if err != nil { return nil, err } - if err := checkPendingCrossMountErrors(ctx, opts); err != nil { - context.GetLogger(r.repo.ctx).Debugf("disabling cross-mount because of pending error: %v", err) + err = checkPendingCrossMountErrors(ctx, opts) + + if err != nil { + context.GetLogger(ctx).Infof("disabling cross-repo mount because of an error: %v", err) + options = append(options, guardCreateOptions{DisableCrossMount: true}) + } else if !opts.Mount.ShouldMount { options = append(options, guardCreateOptions{DisableCrossMount: true}) } else { - options = append(options, guardCreateOptions{}) + context.GetLogger(ctx).Debugf("attempting cross-repo mount") + options = append(options, statCrossMountCreateOptions{ + ctx: ctx, + destRepo: r.repo, + }) } return r.store.Create(ctx, options...) @@ -70,36 +80,27 @@ func (r *errorBlobStore) Resume(ctx context.Context, id string) (distribution.Bl if err := r.repo.checkPendingErrors(ctx); err != nil { return nil, err } - return r.store.Resume(ctx, id) + return r.store.Resume(WithRepository(ctx, r.repo), id) } func (r *errorBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst digest.Digest) error { if err := r.repo.checkPendingErrors(ctx); err != nil { return err } - return r.store.ServeBlob(ctx, w, req, dgst) + return r.store.ServeBlob(WithRepository(ctx, r.repo), w, req, dgst) } func (r *errorBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { if err := r.repo.checkPendingErrors(ctx); err != nil { return err } - return r.store.Delete(ctx, dgst) -} - -// Find out what the blob creation options are going to do by dry-running them -func effectiveCreateOptions(options []distribution.BlobCreateOption) (*storage.CreateOptions, error) { - opts := &storage.CreateOptions{} - for _, createOptions := range options { - err := createOptions.Apply(opts) - if err != nil { - return nil, err - } - } - return opts, nil + return r.store.Delete(WithRepository(ctx, r.repo), dgst) } -func checkPendingCrossMountErrors(ctx context.Context, opts *storage.CreateOptions) error { +// checkPendingCrossMountErrors returns true if a cross-repo mount has been requested with given create +// options. If requested and there are pending authorization errors for source repository, the error will be +// returned. Cross-repo mount must not be allowed in case of error. +func checkPendingCrossMountErrors(ctx context.Context, opts *distribution.CreateOptions) error { if !opts.Mount.ShouldMount { return nil } @@ -118,7 +119,7 @@ type guardCreateOptions struct { var _ distribution.BlobCreateOption = guardCreateOptions{} func (f guardCreateOptions) Apply(v interface{}) error { - opts, ok := v.(*storage.CreateOptions) + opts, ok := v.(*distribution.CreateOptions) if !ok { return fmt.Errorf("Unexpected create options: %#v", v) } @@ -127,3 +128,58 @@ func (f guardCreateOptions) Apply(v interface{}) error { } return nil } + +// statCrossMountCreateOptions ensures the expected options type is passed, and optionally pre-fills the cross-mount stat info +type statCrossMountCreateOptions struct { + ctx context.Context + destRepo *repository +} + +var _ distribution.BlobCreateOption = statCrossMountCreateOptions{} + +func (f statCrossMountCreateOptions) Apply(v interface{}) error { + opts, ok := v.(*distribution.CreateOptions) + if !ok { + return fmt.Errorf("Unexpected create options: %#v", v) + } + + if !opts.Mount.ShouldMount { + return nil + } + + desc, err := statSourceRepository(f.ctx, f.destRepo, opts.Mount.From, opts.Mount.From.Digest()) + if err != nil { + context.GetLogger(f.ctx).Infof("cannot mount blob %s from repository %s: %v - disabling cross-repo mount", + opts.Mount.From.Digest().String(), + opts.Mount.From.Name()) + opts.Mount.ShouldMount = false + return nil + } + + opts.Mount.Stat = &desc + + return nil +} + +func statSourceRepository( + ctx context.Context, + destRepo *repository, + sourceRepoName reference.Named, + dgst digest.Digest, +) (desc distribution.Descriptor, err error) { + upstreamRepo, err := dockerRegistry.Repository(ctx, sourceRepoName) + if err != nil { + return distribution.Descriptor{}, err + } + namespace, name, err := getNamespaceName(sourceRepoName.Name()) + if err != nil { + return distribution.Descriptor{}, err + } + + repo := *destRepo + repo.namespace = namespace + repo.name = name + repo.Repository = upstreamRepo + + return repo.Blobs(ctx).Stat(ctx, dgst) +} diff --git a/pkg/dockerregistry/server/pullthroughblobstore.go b/pkg/dockerregistry/server/pullthroughblobstore.go index 75b5778e762e..c54d4b072d54 100644 --- a/pkg/dockerregistry/server/pullthroughblobstore.go +++ b/pkg/dockerregistry/server/pullthroughblobstore.go @@ -160,7 +160,7 @@ func (r *pullthroughBlobStore) findCandidateRepository(ctx context.Context, sear if err != nil { continue } - r.repo.cachedLayers.RememberDigest(dgst, repo) + r.repo.cachedLayers.RememberDigest(dgst, r.repo.blobrepositorycachettl, repo) context.GetLogger(r.repo.ctx).Infof("Found digest location by search %q in %q: %v", dgst, repo, err) return desc, nil } diff --git a/pkg/dockerregistry/server/quotarestrictedblobstore.go b/pkg/dockerregistry/server/quotarestrictedblobstore.go index 5173e3259d8c..6db162c777cc 100644 --- a/pkg/dockerregistry/server/quotarestrictedblobstore.go +++ b/pkg/dockerregistry/server/quotarestrictedblobstore.go @@ -13,7 +13,6 @@ package server import ( - "fmt" "time" "github.com/Sirupsen/logrus" @@ -33,32 +32,11 @@ const ( // timeout. Caches will only be initialized if the given ttl is positive. Options are gathered from // configuration file and will be overriden by enforceQuota and projectCacheTTL environment variable values. func newQuotaEnforcingConfig(ctx context.Context, enforceQuota, projectCacheTTL string, options map[string]interface{}) *quotaEnforcingConfig { - buildOptionValues := func(optionName string, override string) []string { - optValues := []string{} - if value, ok := options[optionName]; ok { - var res string - switch v := value.(type) { - case string: - res = v - case bool: - res = fmt.Sprintf("%t", v) - default: - res = fmt.Sprintf("%v", v) - } - if len(res) > 0 { - optValues = append(optValues, res) - } - } - if len(override) > 0 { - optValues = append(optValues, override) - } - return optValues + enforce, err := getBoolOption(EnforceQuotaEnvVar, "enforcequota", false, options) + if err != nil { + logrus.Error(err) } - enforce := false - for _, s := range buildOptionValues("enforcequota", enforceQuota) { - enforce = s == "true" - } if !enforce { context.GetLogger(ctx).Info("quota enforcement disabled") return "aEnforcingConfig{ @@ -67,14 +45,9 @@ func newQuotaEnforcingConfig(ctx context.Context, enforceQuota, projectCacheTTL } } - ttl := defaultProjectCacheTTL - for _, s := range buildOptionValues("projectcachettl", projectCacheTTL) { - parsed, err := time.ParseDuration(s) - if err != nil { - logrus.Errorf("failed to parse project cache ttl %q: %v", s, err) - continue - } - ttl = parsed + ttl, err := getDurationOption(ProjectCacheTTLEnvVar, "projectcachettl", defaultProjectCacheTTL, options) + if err != nil { + logrus.Error(err) } if ttl <= 0 { diff --git a/pkg/dockerregistry/server/repositorymiddleware.go b/pkg/dockerregistry/server/repositorymiddleware.go index 81fd7f6c9f5b..870815575af8 100644 --- a/pkg/dockerregistry/server/repositorymiddleware.go +++ b/pkg/dockerregistry/server/repositorymiddleware.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "strings" + "time" "github.com/docker/distribution" "github.com/docker/distribution/context" @@ -29,6 +30,8 @@ import ( ) const ( + // Environment variables + // DockerRegistryURLEnvVar is a mandatory environment variable name specifying url of internal docker // registry. All references to pushed images will be prefixed with its value. DockerRegistryURLEnvVar = "DOCKER_REGISTRY_URL" @@ -46,11 +49,27 @@ const ( // AcceptSchema2EnvVar is a boolean environment variable that allows to accept manifest schema v2 // on manifest put requests. AcceptSchema2EnvVar = "REGISTRY_MIDDLEWARE_REPOSITORY_OPENSHIFT_ACCEPTSCHEMA2" + + // BlobRepositoryCacheTTLEnvVar is an environment variable specifying an eviction timeout for entries. The higher the value, the faster queries but also a higher risk of + // leaking a blob that is no longer tagged in given repository. + BlobRepositoryCacheTTLEnvVar = "REGISTRY_MIDDLEWARE_REPOSITORY_OPENSHIFT_BLOBREPOSITORYCACHETTL" + + // Default values + + defaultDigestToRepositoryCacheSize = 2048 + defaultBlobRepositoryCacheTTL = time.Minute * 10 ) var ( - // cachedLayers is a shared cache of blob digests to remote repositories that have previously - // been identified as containing that blob. Thread safe and reused by all middleware layers. + // cachedLayers is a shared cache of blob digests to repositories that have previously been identified as + // containing that blob. Thread safe and reused by all middleware layers. It contains two kinds of + // associations: + // 1. <-> // + // 2. <-> / + // The first associates a blob with a remote repository. Such an entry is set and used by pullthrough + // middleware. The second associates a blob with a local repository. Such a blob is expected to reside on + // local storage. It's set and used by blobDescriptorService middleware. cachedLayers digestToRepositoryCache // secureTransport is the transport pool used for pullthrough to remote registries marked as // secure. @@ -64,7 +83,7 @@ var ( ) func init() { - cache, err := newDigestToRepositoryCache(1024) + cache, err := newDigestToRepositoryCache(defaultDigestToRepositoryCacheSize) if err != nil { panic(err) } @@ -89,6 +108,7 @@ func init() { if quotaEnforcing == nil { quotaEnforcing = newQuotaEnforcingConfig(ctx, os.Getenv(EnforceQuotaEnvVar), os.Getenv(ProjectCacheTTLEnvVar), options) } + return newRepositoryWithClient(registryOSClient, kClient, kClient, ctx, repo, options) }, ) @@ -118,6 +138,8 @@ type repository struct { pullthrough bool // acceptschema2 allows to refuse the manifest schema version 2 acceptschema2 bool + // blobrepositorycachettl is an eviction timeout for entries of cachedLayers + blobrepositorycachettl time.Duration // cachedLayers remembers a mapping of layer digest to repositories recently seen with that image to avoid // having to check every potential upstream repository when a blob request is made. The cache is useful only // when session affinity is on for the registry, but in practice the first pull will fill the cache. @@ -140,14 +162,17 @@ func newRepositoryWithClient( return nil, fmt.Errorf("%s is required", DockerRegistryURLEnvVar) } - pullthrough := getBoolOption("pullthrough", false, options) - - acceptschema2 := false - - if os.Getenv(AcceptSchema2EnvVar) != "" { - acceptschema2 = os.Getenv(AcceptSchema2EnvVar) == "true" - } else { - acceptschema2 = getBoolOption("acceptschema2", false, options) + acceptschema2, err := getBoolOption(AcceptSchema2EnvVar, "acceptschema2", false, options) + if err != nil { + context.GetLogger(ctx).Error(err) + } + blobrepositorycachettl, err := getDurationOption(BlobRepositoryCacheTTLEnvVar, "blobrepositorycachettl", defaultBlobRepositoryCacheTTL, options) + if err != nil { + context.GetLogger(ctx).Error(err) + } + pullthrough, err := getBoolOption("", "pullthrough", false, options) + if err != nil { + context.GetLogger(ctx).Error(err) } nameParts := strings.SplitN(repo.Named().Name(), "/", 2) @@ -158,29 +183,20 @@ func newRepositoryWithClient( return &repository{ Repository: repo, - ctx: ctx, - quotaClient: quotaClient, - limitClient: limitClient, - registryOSClient: registryOSClient, - registryAddr: registryAddr, - namespace: nameParts[0], - name: nameParts[1], - pullthrough: pullthrough, - acceptschema2: acceptschema2, - cachedLayers: cachedLayers, + ctx: ctx, + quotaClient: quotaClient, + limitClient: limitClient, + registryOSClient: registryOSClient, + registryAddr: registryAddr, + namespace: nameParts[0], + name: nameParts[1], + acceptschema2: acceptschema2, + blobrepositorycachettl: blobrepositorycachettl, + pullthrough: pullthrough, + cachedLayers: cachedLayers, }, nil } -func getBoolOption(name string, defval bool, options map[string]interface{}) bool { - if value, ok := options[name]; ok { - var b bool - if b, ok = value.(bool); ok { - return b - } - } - return defval -} - // Manifests returns r, which implements distribution.ManifestService. func (r *repository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { if r.ctx == ctx { @@ -271,7 +287,16 @@ func (r *repository) Get(ctx context.Context, dgst digest.Digest, options ...dis } ref := imageapi.DockerImageReference{Namespace: r.namespace, Name: r.name, Registry: r.registryAddr} - manifest, err := r.manifestFromImageWithCachedLayers(image, ref.DockerClientDefaults().Exact()) + if managed := image.Annotations[imageapi.ManagedByOpenShiftAnnotation]; managed == "true" { + // Repository without a registry part is refers to repository containing locally managed images. + // Such an entry is retrieved, checked and set by blobDescriptorService operating only on local blobs. + ref.Registry = "" + } else { + // Repository with a registry points to remote repository. This is used by pullthrough middleware. + ref = ref.DockerClientDefaults().AsRepository() + } + + manifest, err := r.manifestFromImageWithCachedLayers(image, ref.Exact()) return manifest, err } @@ -485,6 +510,7 @@ func (r *repository) Delete(ctx context.Context, dgst digest.Digest) error { if err != nil { return err } + ctx = WithRepository(ctx, r) return ms.Delete(ctx, dgst) } @@ -516,30 +542,57 @@ func (r *repository) getImageStreamImage(dgst digest.Digest) (*imageapi.ImageStr return r.registryOSClient.ImageStreamImages(r.namespace).Get(r.name, dgst.String()) } -// rememberLayers caches the provided layers -func (r *repository) rememberLayers(manifest distribution.Manifest, cacheName string) { - if !r.pullthrough { +// rememberLayersOfImage caches the layer digests of given image +func (r *repository) rememberLayersOfImage(image *imageapi.Image, cacheName string) { + if len(image.DockerImageLayers) == 0 && len(image.DockerImageManifestMediaType) > 0 { + // image has no layers return } + + if len(image.DockerImageLayers) > 0 { + for _, layer := range image.DockerImageLayers { + r.cachedLayers.RememberDigest(digest.Digest(layer.Name), r.blobrepositorycachettl, cacheName) + } + return + } + + manifest, err := r.getManifestFromImage(image) + if err != nil { + context.GetLogger(r.ctx).Errorf("cannot remember layers of image %q: %v", image.Name, err) + return + } + r.rememberLayersOfManifest(manifest, cacheName) +} + +// rememberLayersOfManifest caches the layer digests of given manifest +func (r *repository) rememberLayersOfManifest(manifest distribution.Manifest, cacheName string) { // remember the layers in the cache as an optimization to avoid searching all remote repositories for _, layer := range manifest.References() { - r.cachedLayers.RememberDigest(layer.Digest, cacheName) + r.cachedLayers.RememberDigest(layer.Digest, r.blobrepositorycachettl, cacheName) } } // manifestFromImageWithCachedLayers loads the image and then caches any located layers func (r *repository) manifestFromImageWithCachedLayers(image *imageapi.Image, cacheName string) (manifest distribution.Manifest, err error) { - if image.DockerImageManifestMediaType == schema2.MediaTypeManifest { - manifest, err = r.deserializedManifestFromImage(image) - } else { - manifest, err = r.signedManifestFromImage(image) - } - + manifest, err = r.getManifestFromImage(image) if err != nil { return } - r.rememberLayers(manifest, cacheName) + r.rememberLayersOfManifest(manifest, cacheName) + return +} + +// getManifestFromImage returns a manifest object constructed from a blob stored in the given image. +func (r *repository) getManifestFromImage(image *imageapi.Image) (manifest distribution.Manifest, err error) { + switch image.DockerImageManifestMediaType { + case schema2.MediaTypeManifest: + manifest, err = deserializedManifestFromImage(image) + case schema1.MediaTypeManifest, "": + manifest, err = r.signedManifestFromImage(image) + default: + err = regapi.ErrorCodeManifestInvalid.WithDetail(fmt.Errorf("unknown manifest media type %q", image.DockerImageManifestMediaType)) + } return } diff --git a/pkg/dockerregistry/server/repositorymiddleware_test.go b/pkg/dockerregistry/server/repositorymiddleware_test.go new file mode 100644 index 000000000000..3ce40d6a4c2a --- /dev/null +++ b/pkg/dockerregistry/server/repositorymiddleware_test.go @@ -0,0 +1,782 @@ +package server + +import ( + "fmt" + "io" + "reflect" + "strings" + "testing" + "time" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/reference" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/cache" + "github.com/docker/distribution/registry/storage/cache/memory" + "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/inmemory" + "github.com/docker/libtrust" + + kapi "k8s.io/kubernetes/pkg/api" + kerrors "k8s.io/kubernetes/pkg/api/errors" + ktestclient "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/diff" + + "github.com/openshift/origin/pkg/client" + "github.com/openshift/origin/pkg/client/testclient" + registrytest "github.com/openshift/origin/pkg/dockerregistry/testutil" + imagetest "github.com/openshift/origin/pkg/image/admission/testutil" + imageapi "github.com/openshift/origin/pkg/image/api" +) + +const ( + // testImageLayerCount says how many layers to generate per image + testImageLayerCount = 2 + testBlobRepositoryCacheTTL = time.Millisecond * 500 +) + +func TestRepositoryBlobStat(t *testing.T) { + quotaEnforcing = "aEnforcingConfig{} + + ctx := context.Background() + // this driver holds all the testing blobs in memory during the whole test run + driver := inmemory.New() + // generate two images and store their blobs in the driver + testImages, err := populateTestStorage(t, driver, true, 1, map[string]int{"nm/is:latest": 1, "nm/repo:missing-layer-links": 1}, nil) + if err != nil { + t.Fatal(err) + } + // generate an image and store its blobs in the driver; the resulting image will lack managed by openshift + // annotation + testImages, err = populateTestStorage(t, driver, false, 1, map[string]int{"nm/unmanaged:missing-layer-links": 1}, testImages) + if err != nil { + t.Fatal(err) + } + + // remove layer repository links from two of the above images; keep the uploaded blobs in the global + // blostore though + for _, name := range []string{"nm/repo:missing-layer-links", "nm/unmanaged:missing-layer-links"} { + repoName := strings.Split(name, ":")[0] + for _, layer := range testImages[name][0].DockerImageLayers { + dgst := digest.Digest(layer.Name) + alg, hex := dgst.Algorithm(), dgst.Hex() + err := driver.Delete(ctx, fmt.Sprintf("/docker/registry/v2/repositories/%s/_layers/%s/%s", repoName, alg, hex)) + if err != nil { + t.Fatalf("failed to delete layer link %q from repository %q: %v", layer.Name, repoName, err) + } + } + } + + // generate random images without storing its blobs in the driver + etcdOnlyImages := map[string]*imageapi.Image{} + for _, d := range []struct { + name string + managed bool + }{{"nm/is", true}, {"registry.org:5000/user/app", false}} { + img, err := registrytest.NewImageForManifest(d.name, registrytest.SampleImageManifestSchema1, d.managed) + if err != nil { + t.Fatal(err) + } + etcdOnlyImages[d.name] = img + } + + for _, tc := range []struct { + name string + stat string + images []imageapi.Image + imageStreams []imageapi.ImageStream + pullthrough bool + skipAuth bool + deferredErrors deferredErrors + expectedDescriptor distribution.Descriptor + expectedError error + expectedActions []clientAction + }{ + { + name: "local stat", + stat: "nm/is@" + testImages["nm/is:latest"][0].DockerImageLayers[0].Name, + imageStreams: []imageapi.ImageStream{{ObjectMeta: kapi.ObjectMeta{Namespace: "nm", Name: "is"}}}, + expectedDescriptor: testNewDescriptorForLayer(testImages["nm/is:latest"][0].DockerImageLayers[0]), + }, + + { + name: "blob only tagged in image stream", + stat: "nm/repo@" + testImages["nm/repo:missing-layer-links"][0].DockerImageLayers[1].Name, + images: []imageapi.Image{*testImages["nm/repo:missing-layer-links"][0]}, + imageStreams: []imageapi.ImageStream{ + { + ObjectMeta: kapi.ObjectMeta{ + Namespace: "nm", + Name: "repo", + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "latest": { + Items: []imageapi.TagEvent{ + { + Image: testImages["nm/repo:missing-layer-links"][0].Name, + }, + }, + }, + }, + }, + }, + }, + expectedDescriptor: testNewDescriptorForLayer(testImages["nm/repo:missing-layer-links"][0].DockerImageLayers[1]), + expectedActions: []clientAction{{"get", "imagestreams"}, {"get", "images"}}, + }, + + { + name: "blob referenced only by not managed image with pullthrough on", + stat: "nm/unmanaged@" + testImages["nm/unmanaged:missing-layer-links"][0].DockerImageLayers[1].Name, + images: []imageapi.Image{*testImages["nm/unmanaged:missing-layer-links"][0]}, + imageStreams: []imageapi.ImageStream{ + { + ObjectMeta: kapi.ObjectMeta{ + Namespace: "nm", + Name: "unmanaged", + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "latest": { + Items: []imageapi.TagEvent{ + { + Image: testImages["nm/unmanaged:missing-layer-links"][0].Name, + }, + }, + }, + }, + }, + }, + }, + pullthrough: true, + expectedDescriptor: testNewDescriptorForLayer(testImages["nm/unmanaged:missing-layer-links"][0].DockerImageLayers[1]), + expectedActions: []clientAction{{"get", "imagestreams"}, {"get", "images"}}, + }, + + { + // TODO: this should err out because of missing image stream. + // Unfortunately, it's not the case. Until we start storing layer links in etcd, we depend on + // local layer links. + name: "layer link present while image stream not found", + stat: "nm/is@" + testImages["nm/is:latest"][0].DockerImageLayers[0].Name, + images: []imageapi.Image{*testImages["nm/is:latest"][0]}, + expectedDescriptor: testNewDescriptorForLayer(testImages["nm/is:latest"][0].DockerImageLayers[0]), + }, + + { + name: "blob only tagged by not managed image with pullthrough off", + stat: "nm/repo@" + testImages["nm/unmanaged:missing-layer-links"][0].DockerImageLayers[1].Name, + images: []imageapi.Image{*testImages["nm/unmanaged:missing-layer-links"][0]}, + imageStreams: []imageapi.ImageStream{ + { + ObjectMeta: kapi.ObjectMeta{ + Namespace: "nm", + Name: "repo", + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "latest": { + Items: []imageapi.TagEvent{ + { + Image: testImages["nm/unmanaged:missing-layer-links"][0].DockerImageLayers[1].Name, + }, + }, + }, + }, + }, + }, + }, + expectedError: distribution.ErrBlobUnknown, + expectedActions: []clientAction{{"get", "imagestreams"}, {"get", "images"}}, + }, + + { + name: "blob not stored locally but referred in image stream", + stat: "nm/is@" + etcdOnlyImages["nm/is"].DockerImageLayers[1].Name, + images: []imageapi.Image{*etcdOnlyImages["nm/is"]}, + imageStreams: []imageapi.ImageStream{ + { + ObjectMeta: kapi.ObjectMeta{ + Namespace: "nm", + Name: "is", + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "latest": { + Items: []imageapi.TagEvent{ + { + Image: etcdOnlyImages["nm/is"].Name, + }, + }, + }, + }, + }, + }, + }, + expectedError: distribution.ErrBlobUnknown, + }, + + { + name: "blob does not exist", + stat: "nm/repo@" + etcdOnlyImages["nm/is"].DockerImageLayers[0].Name, + images: []imageapi.Image{*testImages["nm/is:latest"][0]}, + imageStreams: []imageapi.ImageStream{ + { + ObjectMeta: kapi.ObjectMeta{ + Namespace: "nm", + Name: "repo", + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "latest": { + Items: []imageapi.TagEvent{ + { + Image: testImages["nm/is:latest"][0].Name, + }, + }, + }, + }, + }, + }, + }, + expectedError: distribution.ErrBlobUnknown, + }, + + { + name: "auth not performed", + stat: "nm/is@" + testImages["nm/is:latest"][0].DockerImageLayers[0].Name, + imageStreams: []imageapi.ImageStream{{ObjectMeta: kapi.ObjectMeta{Namespace: "nm", Name: "is"}}}, + skipAuth: true, + expectedError: fmt.Errorf("openshift.auth.completed missing from context"), + }, + + { + name: "deferred error", + stat: "nm/is@" + testImages["nm/is:latest"][0].DockerImageLayers[0].Name, + imageStreams: []imageapi.ImageStream{{ObjectMeta: kapi.ObjectMeta{Namespace: "nm", Name: "is"}}}, + deferredErrors: deferredErrors{"nm/is": ErrOpenShiftAccessDenied}, + expectedError: ErrOpenShiftAccessDenied, + }, + } { + ref, err := reference.Parse(tc.stat) + if err != nil { + t.Errorf("[%s] failed to parse blob reference %q: %v", tc.name, tc.stat, err) + continue + } + canonical, ok := ref.(reference.Canonical) + if !ok { + t.Errorf("[%s] not a canonical reference %q", tc.name, ref.String()) + continue + } + + cachedLayers, err = newDigestToRepositoryCache(defaultDigestToRepositoryCacheSize) + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + if !tc.skipAuth { + ctx = WithAuthPerformed(ctx) + } + if tc.deferredErrors != nil { + ctx = WithDeferredErrors(ctx, tc.deferredErrors) + } + + client := &testclient.Fake{} + client.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, tc.imageStreams...)) + client.AddReactor("get", "images", getFakeImageGetHandler(t, tc.images...)) + + reg, err := newTestRegistry(ctx, client, driver, defaultBlobRepositoryCacheTTL, tc.pullthrough, true) + if err != nil { + t.Errorf("[%s] unexpected error: %v", tc.name, err) + continue + } + + repo, err := reg.Repository(ctx, canonical) + if err != nil { + t.Errorf("[%s] unexpected error: %v", tc.name, err) + continue + } + + desc, err := repo.Blobs(ctx).Stat(ctx, canonical.Digest()) + if err != nil && tc.expectedError == nil { + t.Errorf("[%s] got unexpected stat error: %v", tc.name, err) + continue + } + if err == nil && tc.expectedError != nil { + t.Errorf("[%s] got unexpected non-error", tc.name) + continue + } + if !reflect.DeepEqual(err, tc.expectedError) { + t.Errorf("[%s] got unexpected error: %s", tc.name, diff.ObjectGoPrintDiff(err, tc.expectedError)) + continue + } + if tc.expectedError == nil && !reflect.DeepEqual(desc, tc.expectedDescriptor) { + t.Errorf("[%s] got unexpected descriptor: %s", tc.name, diff.ObjectGoPrintDiff(desc, tc.expectedDescriptor)) + } + + compareActions(t, tc.name, client.Actions(), tc.expectedActions) + } +} + +func TestRepositoryBlobStatCacheEviction(t *testing.T) { + const blobRepoCacheTTL = time.Millisecond * 500 + + quotaEnforcing = "aEnforcingConfig{} + ctx := WithAuthPerformed(context.Background()) + + // this driver holds all the testing blobs in memory during the whole test run + driver := inmemory.New() + // generate two images and store their blobs in the driver + testImages, err := populateTestStorage(t, driver, true, 1, map[string]int{"nm/is:latest": 1}, nil) + if err != nil { + t.Fatal(err) + } + testImage := testImages["nm/is:latest"][0] + testImageStream := testNewImageStreamObject("nm", "is", "latest", testImage.Name) + + blob1Desc := testNewDescriptorForLayer(testImage.DockerImageLayers[0]) + blob1Dgst := blob1Desc.Digest + blob2Desc := testNewDescriptorForLayer(testImage.DockerImageLayers[1]) + blob2Dgst := blob2Desc.Digest + + // remove repo layer repo link of the image's second blob + alg, hex := blob2Dgst.Algorithm(), blob2Dgst.Hex() + err = driver.Delete(ctx, fmt.Sprintf("/docker/registry/v2/repositories/%s/_layers/%s/%s", "nm/is", alg, hex)) + + cachedLayers, err = newDigestToRepositoryCache(defaultDigestToRepositoryCacheSize) + if err != nil { + t.Fatal(err) + } + + client := &testclient.Fake{} + client.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, *testImageStream)) + client.AddReactor("get", "images", getFakeImageGetHandler(t, *testImage)) + + reg, err := newTestRegistry(ctx, client, driver, blobRepoCacheTTL, false, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ref, err := reference.ParseNamed("nm/is") + if err != nil { + t.Errorf("failed to parse blob reference %q: %v", "nm/is", err) + } + + repo, err := reg.Repository(ctx, ref) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // hit the layer repo link - cache the result + desc, err := repo.Blobs(ctx).Stat(ctx, blob1Dgst) + if err != nil { + t.Fatalf("got unexpected stat error: %v", err) + } + if !reflect.DeepEqual(desc, blob1Desc) { + t.Fatalf("got unexpected descriptor: %#+v != %#+v", desc, blob1Desc) + } + + compareActions(t, "no actions expected", client.Actions(), []clientAction{}) + + // remove layer repo link, delete the association from cache as well + err = repo.Blobs(ctx).Delete(ctx, blob1Dgst) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + + // query etcd + desc, err = repo.Blobs(ctx).Stat(ctx, blob1Dgst) + if err != nil { + t.Fatalf("got unexpected stat error: %v", err) + } + if !reflect.DeepEqual(desc, blob1Desc) { + t.Fatalf("got unexpected descriptor: %#+v != %#+v", desc, blob1Desc) + } + + expectedActions := []clientAction{{"get", "imagestreams"}, {"get", "images"}} + compareActions(t, "1st roundtrip to etcd", client.Actions(), expectedActions) + + // remove the underlying blob + vacuum := storage.NewVacuum(ctx, driver) + err = vacuum.RemoveBlob(blob1Dgst.String()) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + + // fail because the blob isn't stored locally + desc, err = repo.Blobs(ctx).Stat(ctx, blob1Dgst) + if err == nil { + t.Fatalf("got unexpected non error: %v", err) + } + if err != distribution.ErrBlobUnknown { + t.Fatalf("got unexpected error: %#+v", err) + } + + // cache hit - don't query etcd + desc, err = repo.Blobs(ctx).Stat(ctx, blob2Dgst) + if err != nil { + t.Fatalf("got unexpected stat error: %v", err) + } + if !reflect.DeepEqual(desc, blob2Desc) { + t.Fatalf("got unexpected descriptor: %#+v != %#+v", desc, blob2Desc) + } + + compareActions(t, "no etcd query", client.Actions(), expectedActions) + + lastStatTimestamp := time.Now() + + // hit the cache + desc, err = repo.Blobs(ctx).Stat(ctx, blob2Dgst) + if err != nil { + t.Fatalf("got unexpected stat error: %v", err) + } + if !reflect.DeepEqual(desc, blob2Desc) { + t.Fatalf("got unexpected descriptor: %#+v != %#+v", desc, blob2Desc) + } + + // cache hit - no additional etcd query + compareActions(t, "no roundrip to etcd", client.Actions(), expectedActions) + + t.Logf("sleeping %s while waiting for eviction of blob %q from cache", blobRepoCacheTTL.String(), blob2Dgst.String()) + time.Sleep(blobRepoCacheTTL - (time.Now().Sub(lastStatTimestamp))) + + desc, err = repo.Blobs(ctx).Stat(ctx, blob2Dgst) + if err != nil { + t.Fatalf("got unexpected stat error: %v", err) + } + if !reflect.DeepEqual(desc, blob2Desc) { + t.Fatalf("got unexpected descriptor: %#+v != %#+v", desc, blob2Desc) + } + + expectedActions = append(expectedActions, []clientAction{{"get", "imagestreams"}, {"get", "images"}}...) + compareActions(t, "2nd roundtrip to etcd", client.Actions(), expectedActions) + + err = vacuum.RemoveBlob(blob2Dgst.String()) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + + // fail because the blob isn't stored locally + desc, err = repo.Blobs(ctx).Stat(ctx, blob2Dgst) + if err == nil { + t.Fatalf("got unexpected non error: %v", err) + } + if err != distribution.ErrBlobUnknown { + t.Fatalf("got unexpected error: %#+v", err) + } +} + +type clientAction struct { + verb string + resource string +} + +func getFakeImageGetHandler(t *testing.T, iss ...imageapi.Image) ktestclient.ReactionFunc { + return func(action ktestclient.Action) (handled bool, ret runtime.Object, err error) { + switch a := action.(type) { + case ktestclient.GetAction: + for _, is := range iss { + if a.GetName() == is.Name { + t.Logf("images get handler: returning image %s", is.Name) + return true, &is, nil + } + } + + err := kerrors.NewNotFound(kapi.Resource("images"), a.GetName()) + t.Logf("image get handler: %v", err) + return true, nil, err + } + return false, nil, nil + } +} + +func storeTestImage( + ctx context.Context, + reg distribution.Namespace, + imageReference reference.NamedTagged, + schemaVersion int, + managedByOpenShift bool, +) (*imageapi.Image, error) { + repo, err := reg.Repository(ctx, imageReference) + if err != nil { + return nil, fmt.Errorf("unexpected error getting repo %q: %v", imageReference.Name(), err) + } + + var ( + m distribution.Manifest + m1 schema1.Manifest + ) + switch schemaVersion { + case 1: + m1 = schema1.Manifest{ + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + Name: imageReference.Name(), + Tag: imageReference.Tag(), + } + case 2: + // TODO + fallthrough + default: + return nil, fmt.Errorf("unsupported manifest version %d", schemaVersion) + } + + for i := 0; i < testImageLayerCount; i++ { + rs, ds, err := registrytest.CreateRandomTarFile() + if err != nil { + return nil, fmt.Errorf("unexpected error generating test layer file: %v", err) + } + dgst := digest.Digest(ds) + + wr, err := repo.Blobs(ctx).Create(ctx) + if err != nil { + return nil, fmt.Errorf("unexpected error creating test upload: %v", err) + } + defer wr.Close() + + n, err := io.Copy(wr, rs) + if err != nil { + return nil, fmt.Errorf("unexpected error copying to upload: %v", err) + } + + if schemaVersion == 1 { + m1.FSLayers = append(m1.FSLayers, schema1.FSLayer{BlobSum: dgst}) + m1.History = append(m1.History, schema1.History{V1Compatibility: fmt.Sprintf(`{"size":%d}`, n)}) + } // TODO v2 + + if _, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst, MediaType: schema1.MediaTypeManifestLayer}); err != nil { + return nil, fmt.Errorf("unexpected error finishing upload: %v", err) + } + } + + var dgst digest.Digest + var payload []byte + + if schemaVersion == 1 { + pk, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + return nil, fmt.Errorf("unexpected error generating private key: %v", err) + } + + m, err = schema1.Sign(&m1, pk) + if err != nil { + return nil, fmt.Errorf("error signing manifest: %v", err) + } + + _, payload, err = m.Payload() + if err != nil { + return nil, fmt.Errorf("error getting payload %#v", err) + } + + dgst = digest.FromBytes(payload) + } //TODO v2 + + image := &imageapi.Image{ + ObjectMeta: kapi.ObjectMeta{ + Name: dgst.String(), + }, + DockerImageManifest: string(payload), + DockerImageReference: imageReference.Name() + "@" + dgst.String(), + } + + if managedByOpenShift { + image.Annotations = map[string]string{imageapi.ManagedByOpenShiftAnnotation: "true"} + } + + if schemaVersion == 1 { + signedManifest := m.(*schema1.SignedManifest) + signatures, err := signedManifest.Signatures() + if err != nil { + return nil, err + } + + for _, signDigest := range signatures { + image.DockerImageSignatures = append(image.DockerImageSignatures, signDigest) + } + } + + err = imageapi.ImageWithMetadata(image) + if err != nil { + return nil, fmt.Errorf("failed to fill image with metadata: %v", err) + } + + return image, nil +} + +func populateTestStorage( + t *testing.T, + driver driver.StorageDriver, + setManagedByOpenShift bool, + schemaVersion int, + repoImages map[string]int, + testImages map[string][]*imageapi.Image, +) (map[string][]*imageapi.Image, error) { + ctx := context.Background() + reg, err := storage.NewRegistry(ctx, driver) + if err != nil { + t.Fatalf("error creating registry: %v", err) + } + + result := make(map[string][]*imageapi.Image) + for key, value := range testImages { + images := make([]*imageapi.Image, len(value)) + copy(images, value) + result[key] = images + } + + for imageReference := range repoImages { + parsed, err := reference.Parse(imageReference) + if err != nil { + t.Fatalf("failed to parse reference %q: %v", imageReference, err) + } + namedTagged, ok := parsed.(reference.NamedTagged) + if !ok { + t.Fatalf("expected NamedTagged reference, not %T", parsed) + } + + imageCount := repoImages[imageReference] + + for i := 0; i < imageCount; i++ { + img, err := storeTestImage(ctx, reg, namedTagged, schemaVersion, setManagedByOpenShift) + if err != nil { + t.Fatal(err) + } + arr := result[imageReference] + t.Logf("created image %s@%s image with layers:", namedTagged.Name(), img.Name) + for _, l := range img.DockerImageLayers { + t.Logf(" %s of size %d", l.Name, l.LayerSize) + } + result[imageReference] = append(arr, img) + } + } + + return result, nil +} + +func newTestRegistry( + ctx context.Context, + osClient client.Interface, + storageDriver driver.StorageDriver, + blobrepositorycachettl time.Duration, + pullthrough bool, + useBlobDescriptorCacheProvider bool, +) (*testRegistry, error) { + if storageDriver == nil { + storageDriver = inmemory.New() + } + dockerStorageDriver = storageDriver + + opts := []storage.RegistryOption{ + storage.BlobDescriptorServiceFactory(&blobDescriptorServiceFactory{}), + storage.EnableDelete, + storage.EnableRedirect, + } + if useBlobDescriptorCacheProvider { + cacheProvider := cache.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()) + opts = append(opts, storage.BlobDescriptorCacheProvider(cacheProvider)) + } + + reg, err := storage.NewRegistry(ctx, dockerStorageDriver, opts...) + if err != nil { + return nil, err + } + dockerRegistry = reg + + return &testRegistry{ + Namespace: dockerRegistry, + osClient: osClient, + blobrepositorycachettl: blobrepositorycachettl, + pullthrough: pullthrough, + }, nil +} + +type testRegistry struct { + distribution.Namespace + osClient client.Interface + pullthrough bool + blobrepositorycachettl time.Duration +} + +var _ distribution.Namespace = &testRegistry{} + +func (r *testRegistry) Repository(ctx context.Context, ref reference.Named) (distribution.Repository, error) { + repo, err := r.Namespace.Repository(ctx, ref) + if err != nil { + return nil, err + } + + kFakeClient := ktestclient.NewSimpleFake() + + parts := strings.SplitN(ref.Name(), "/", 3) + if len(parts) != 2 { + return nil, fmt.Errorf("failed to parse repository name %q", ref.Name()) + } + + return &repository{ + Repository: repo, + + ctx: ctx, + quotaClient: kFakeClient, + limitClient: kFakeClient, + registryOSClient: r.osClient, + registryAddr: "localhost:5000", + namespace: parts[0], + name: parts[1], + blobrepositorycachettl: r.blobrepositorycachettl, + cachedLayers: cachedLayers, + pullthrough: r.pullthrough, + }, nil +} + +func testNewImageStreamObject(namespace, name, tag, imageName string) *imageapi.ImageStream { + return &imageapi.ImageStream{ + ObjectMeta: kapi.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + tag: { + Items: []imageapi.TagEvent{ + { + Image: imageName, + }, + }, + }, + }, + }, + } +} + +func testNewDescriptorForLayer(layer imageapi.ImageLayer) distribution.Descriptor { + return distribution.Descriptor{ + Digest: digest.Digest(layer.Name), + MediaType: "application/octet-stream", + Size: layer.LayerSize, + } +} + +func compareActions(t *testing.T, testCaseName string, actions []ktestclient.Action, expectedActions []clientAction) { + for i, action := range actions { + if i >= len(expectedActions) { + t.Errorf("[%s] got unexpected client action: %#+v", testCaseName, action) + continue + } + expected := expectedActions[i] + if !action.Matches(expected.verb, expected.resource) { + t.Errorf("[%s] expected client action %s[%s], got instead: %#+v", testCaseName, expected.verb, expected.resource, action) + } + } + for i := len(actions); i < len(expectedActions); i++ { + expected := expectedActions[i] + t.Errorf("[%s] expected action %s[%s] did not happen", testCaseName, expected.verb, expected.resource) + } +} diff --git a/pkg/dockerregistry/server/token.go b/pkg/dockerregistry/server/token.go index 465f0da1cda0..c5dfb63ab94c 100644 --- a/pkg/dockerregistry/server/token.go +++ b/pkg/dockerregistry/server/token.go @@ -17,7 +17,7 @@ type tokenHandler struct { } // NewTokenHandler returns a handler that implements the docker token protocol -func NewTokenHandler(ctx context.Context, client *RegistryClient) http.Handler { +func NewTokenHandler(ctx context.Context, client RegistryClient) http.Handler { return &tokenHandler{ ctx: ctx, anonymousConfig: client.SafeClientConfig(), diff --git a/pkg/dockerregistry/server/util.go b/pkg/dockerregistry/server/util.go new file mode 100644 index 000000000000..9f5fdfd3e585 --- /dev/null +++ b/pkg/dockerregistry/server/util.go @@ -0,0 +1,137 @@ +package server + +import ( + "encoding/json" + "fmt" + "os" + "strings" + "time" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/manifest/schema2" + + imageapi "github.com/openshift/origin/pkg/image/api" +) + +// Context keys +const ( + // repositoryKey serves to store/retrieve repository object to/from context. + repositoryKey = "openshift.repository" +) + +func WithRepository(parent context.Context, repo *repository) context.Context { + return context.WithValue(parent, repositoryKey, repo) +} +func RepositoryFrom(ctx context.Context) (repo *repository, found bool) { + repo, found = ctx.Value(repositoryKey).(*repository) + return +} + +func getOptionValue( + envVar string, + optionName string, + defval interface{}, + options map[string]interface{}, + conversionFunc func(v interface{}) (interface{}, error), +) (value interface{}, err error) { + value = defval + if optValue, ok := options[optionName]; ok { + converted, convErr := conversionFunc(optValue) + if convErr != nil { + err = fmt.Errorf("config option %q: invalid value: %v", optionName, convErr) + } else { + value = converted + } + } + + if len(envVar) == 0 { + return + } + envValue := os.Getenv(envVar) + if len(envValue) == 0 { + return + } + + converted, convErr := conversionFunc(envValue) + if convErr != nil { + err = fmt.Errorf("invalid value of environment variable %s: %v", envVar, convErr) + } else { + value = converted + } + + return +} + +func getBoolOption(envVar string, optionName string, defval bool, options map[string]interface{}) (bool, error) { + value, err := getOptionValue(envVar, optionName, defval, options, func(value interface{}) (b interface{}, err error) { + switch t := value.(type) { + case bool: + return t, nil + case string: + switch strings.ToLower(t) { + case "true": + return true, nil + case "false": + return false, nil + } + } + return defval, fmt.Errorf("%#+v is not a boolean", value) + }) + + return value.(bool), err +} + +func getDurationOption(envVar string, optionName string, defval time.Duration, options map[string]interface{}) (time.Duration, error) { + value, err := getOptionValue(envVar, optionName, defval, options, func(value interface{}) (d interface{}, err error) { + s, ok := value.(string) + if !ok { + return defval, fmt.Errorf("expected string, not %T", value) + } + + parsed, err := time.ParseDuration(s) + if err != nil { + return defval, fmt.Errorf("parse duration error: %v", err) + } + return parsed, nil + }) + + return value.(time.Duration), err +} + +// deserializedManifestFromImage converts an Image to a DeserializedManifest. +func deserializedManifestFromImage(image *imageapi.Image) (*schema2.DeserializedManifest, error) { + var manifest schema2.DeserializedManifest + if err := json.Unmarshal([]byte(image.DockerImageManifest), &manifest); err != nil { + return nil, err + } + return &manifest, nil +} + +func getNamespaceName(resourceName string) (string, string, error) { + repoParts := strings.SplitN(resourceName, "/", 2) + if len(repoParts) != 2 { + return "", "", ErrNamespaceRequired + } + ns := repoParts[0] + if len(ns) == 0 { + return "", "", ErrNamespaceRequired + } + name := repoParts[1] + if len(name) == 0 { + return "", "", ErrNamespaceRequired + } + return ns, name, nil +} + +// effectiveCreateOptions find out what the blob creation options are going to do by dry-running them. +func effectiveCreateOptions(options []distribution.BlobCreateOption) (*distribution.CreateOptions, error) { + opts := &distribution.CreateOptions{} + for _, createOptions := range options { + err := createOptions.Apply(opts) + if err != nil { + return nil, err + } + } + return opts, nil +} diff --git a/pkg/dockerregistry/server/util_test.go b/pkg/dockerregistry/server/util_test.go new file mode 100644 index 000000000000..79bd8f9d4485 --- /dev/null +++ b/pkg/dockerregistry/server/util_test.go @@ -0,0 +1,233 @@ +package server + +import ( + "os" + "testing" + "time" +) + +func TestGetBoolOption(t *testing.T) { + for _, tc := range []struct { + name string + envName string + exportEnv map[string]string + option string + options map[string]interface{} + defaultValue bool + expected bool + expectedError bool + }{ + { + name: "default to false", + defaultValue: false, + option: "opt", + expected: false, + }, + + { + name: "default to true", + defaultValue: true, + option: "opt", + envName: "VBOOL", + expected: true, + }, + + { + name: "given option", + defaultValue: false, + option: "opt", + options: map[string]interface{}{"opt": "true"}, + expected: true, + }, + + { + name: "env value with missing option", + defaultValue: true, + option: "opt", + envName: "VBOOL", + exportEnv: map[string]string{"VBOOL": "false"}, + expected: false, + }, + + { + name: "given option and env var", + defaultValue: false, + option: "opt", + options: map[string]interface{}{"opt": "false"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "true"}, + expected: true, + }, + + { + name: "disable with env var", + defaultValue: true, + option: "opt", + options: map[string]interface{}{"opt": "true"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "false"}, + expected: false, + }, + + { + name: "given option with bad env value", + defaultValue: false, + option: "opt", + options: map[string]interface{}{"opt": "true"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "falsed"}, + expected: true, + expectedError: true, + }, + + { + name: "env value with wrong option type", + defaultValue: true, + option: "opt", + options: map[string]interface{}{"opt": 1}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "true"}, + expected: true, + expectedError: true, + }, + + { + name: "env value with bad option value", + defaultValue: true, + option: "opt", + options: map[string]interface{}{"opt": "falsed"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "false"}, + expected: false, + expectedError: true, + }, + + { + name: "bad env value with bad option value", + defaultValue: false, + option: "opt", + options: map[string]interface{}{"opt": "turk"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "truth"}, + expected: false, + expectedError: true, + }, + } { + for key, value := range tc.exportEnv { + os.Setenv(key, value) + } + d, err := getBoolOption(tc.envName, tc.option, tc.defaultValue, tc.options) + if err == nil && tc.expectedError { + t.Errorf("[%s] unexpected non-error", tc.name) + } else if err != nil && !tc.expectedError { + t.Errorf("[%s] unexpected error: %v", tc.name, err) + } + if d != tc.expected { + t.Errorf("[%s] got unexpected duration: %t != %t", tc.name, d, tc.expected) + } + } +} + +func TestGetDurationOption(t *testing.T) { + for _, tc := range []struct { + name string + envName string + exportEnv map[string]string + option string + options map[string]interface{} + defaultValue time.Duration + expectedDuration time.Duration + expectedError bool + }{ + { + name: "no option, no env", + defaultValue: time.Minute, + option: "opt", + expectedDuration: time.Minute, + }, + + { + name: "given option", + defaultValue: time.Minute, + option: "opt", + options: map[string]interface{}{"opt": "4000ns"}, + expectedDuration: 4000, + }, + + { + name: "env value with missing option", + defaultValue: time.Minute, + option: "opt", + envName: "VAR", + exportEnv: map[string]string{"VAR": "1s"}, + expectedDuration: time.Second, + }, + + { + name: "given option and env var", + defaultValue: time.Minute, + option: "opt", + options: map[string]interface{}{"opt": "4000us"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "1s"}, + expectedDuration: time.Second, + }, + + { + name: "given option with bad env value", + defaultValue: time.Minute, + option: "opt", + options: map[string]interface{}{"opt": "1s"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "bad"}, + expectedDuration: time.Second, + expectedError: true, + }, + + { + name: "env value with wrong option type", + defaultValue: time.Minute, + option: "opt", + options: map[string]interface{}{"opt": false}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "2s"}, + expectedDuration: time.Second * 2, + expectedError: true, + }, + + { + name: "env value with bad option value", + defaultValue: time.Minute, + option: "opt", + options: map[string]interface{}{"opt": "bad"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "2s"}, + expectedDuration: time.Second * 2, + expectedError: true, + }, + + { + name: "bad env value with bad option value", + defaultValue: time.Minute, + option: "opt", + options: map[string]interface{}{"opt": "bad"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "bad"}, + expectedDuration: time.Minute, + expectedError: true, + }, + } { + for key, value := range tc.exportEnv { + os.Setenv(key, value) + } + d, err := getDurationOption(tc.envName, tc.option, tc.defaultValue, tc.options) + if err == nil && tc.expectedError { + t.Errorf("[%s] unexpected non-error", tc.name) + } else if err != nil && !tc.expectedError { + t.Errorf("[%s] unexpected error: %v", tc.name, err) + } + if d != tc.expectedDuration { + t.Errorf("[%s] got unexpected duration: %s != %s", tc.name, d.String(), tc.expectedDuration.String()) + } + } +} diff --git a/pkg/dockerregistry/testutil/util.go b/pkg/dockerregistry/testutil/util.go new file mode 100644 index 000000000000..927414073d84 --- /dev/null +++ b/pkg/dockerregistry/testutil/util.go @@ -0,0 +1,202 @@ +package testutil + +import ( + "archive/tar" + "bytes" + "crypto/rand" + "encoding/json" + "fmt" + "io" + "io/ioutil" + mrand "math/rand" + "net/url" + "time" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/reference" + distclient "github.com/docker/distribution/registry/client" + + kapi "k8s.io/kubernetes/pkg/api" + + imageapi "github.com/openshift/origin/pkg/image/api" +) + +func NewImageForManifest(repoName string, rawManifest string, managedByOpenShift bool) (*imageapi.Image, error) { + var versioned manifest.Versioned + if err := json.Unmarshal([]byte(rawManifest), &versioned); err != nil { + return nil, err + } + + _, desc, err := distribution.UnmarshalManifest(versioned.MediaType, []byte(rawManifest)) + if err != nil { + return nil, err + } + + annotations := make(map[string]string) + if managedByOpenShift { + annotations[imageapi.ManagedByOpenShiftAnnotation] = "true" + } + + img := &imageapi.Image{ + ObjectMeta: kapi.ObjectMeta{ + Name: desc.Digest.String(), + Annotations: annotations, + }, + DockerImageReference: fmt.Sprintf("localhost:5000/%s@%s", repoName, desc.Digest.String()), + DockerImageManifest: string(rawManifest), + } + + if err := imageapi.ImageWithMetadata(img); err != nil { + return nil, err + } + + return img, nil +} + +// UploadTestBlob generates a random tar file and uploads it to the given repository. +func UploadTestBlob(serverURL *url.URL, repoName string) (distribution.Descriptor, []byte, error) { + rs, ds, err := CreateRandomTarFile() + if err != nil { + return distribution.Descriptor{}, nil, fmt.Errorf("unexpected error generating test layer file: %v", err) + } + dgst := digest.Digest(ds) + + ctx := context.Background() + ref, err := reference.ParseNamed(repoName) + if err != nil { + return distribution.Descriptor{}, nil, err + } + repo, err := distclient.NewRepository(ctx, ref, serverURL.String(), nil) + if err != nil { + return distribution.Descriptor{}, nil, fmt.Errorf("failed to get repository %q: %v", repoName, err) + } + + wr, err := repo.Blobs(ctx).Create(ctx) + if err != nil { + return distribution.Descriptor{}, nil, err + } + if _, err := io.Copy(wr, rs); err != nil { + return distribution.Descriptor{}, nil, fmt.Errorf("unexpected error copying to upload: %v", err) + } + desc, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst}) + if err != nil { + return distribution.Descriptor{}, nil, err + } + + if _, err := rs.Seek(0, 0); err != nil { + return distribution.Descriptor{}, nil, fmt.Errorf("failed to seak blob reader: %v", err) + } + content, err := ioutil.ReadAll(rs) + if err != nil { + return distribution.Descriptor{}, nil, fmt.Errorf("failed to read blob content: %v", err) + } + + return desc, content, nil +} + +// createRandomTarFile creates a random tarfile, returning it as an io.ReadSeeker along with its digest. An +// error is returned if there is a problem generating valid content. Inspired by +// github.com/vendor/docker/distribution/testutil/tarfile.go. +func CreateRandomTarFile() (rs io.ReadSeeker, dgst digest.Digest, err error) { + nFiles := 2 + target := &bytes.Buffer{} + wr := tar.NewWriter(target) + + // Perturb this on each iteration of the loop below. + header := &tar.Header{ + Mode: 0644, + ModTime: time.Now(), + Typeflag: tar.TypeReg, + Uname: "randocalrissian", + Gname: "cloudcity", + AccessTime: time.Now(), + ChangeTime: time.Now(), + } + + for fileNumber := 0; fileNumber < nFiles; fileNumber++ { + fileSize := mrand.Int63n(1<<9) + 1<<9 + + header.Name = fmt.Sprint(fileNumber) + header.Size = fileSize + + if err := wr.WriteHeader(header); err != nil { + return nil, "", err + } + + randomData := make([]byte, fileSize) + + // Fill up the buffer with some random data. + n, err := rand.Read(randomData) + + if n != len(randomData) { + return nil, "", fmt.Errorf("short read creating random reader: %v bytes != %v bytes", n, len(randomData)) + } + + if err != nil { + return nil, "", err + } + + nn, err := io.Copy(wr, bytes.NewReader(randomData)) + if nn != fileSize { + return nil, "", fmt.Errorf("short copy writing random file to tar") + } + + if err != nil { + return nil, "", err + } + + if err := wr.Flush(); err != nil { + return nil, "", err + } + } + + if err := wr.Close(); err != nil { + return nil, "", err + } + + dgst = digest.FromBytes(target.Bytes()) + + return bytes.NewReader(target.Bytes()), dgst, nil +} + +const SampleImageManifestSchema1 = `{ + "schemaVersion": 1, + "name": "nm/is", + "tag": "latest", + "architecture": "", + "fsLayers": [ + { + "blobSum": "sha256:b2c5513bd934a7efb412c0dd965600b8cb00575b585eaff1cb980b69037fe6cd" + }, + { + "blobSum": "sha256:2dde6f11a89463bf20dba3b47d8b3b6de7cdcc19e50634e95a18dd95c278768d" + } + ], + "history": [ + { + "v1Compatibility": "{\"size\":18407936}" + }, + { + "v1Compatibility": "{\"size\":19387392}" + } + ], + "signatures": [ + { + "header": { + "jwk": { + "crv": "P-256", + "kid": "5HTY:A24B:L6PG:TQ3G:GMAK:QGKZ:ICD4:S7ZJ:P5JX:UTMP:XZLK:ZXVH", + "kty": "EC", + "x": "j5YnDSyrVIt3NquUKvcZIpbfeD8HLZ7BVBFL4WutRBM", + "y": "PBgFAZ3nNakYN3H9enhrdUrQ_HPYzb8oX5rtJxJo1Y8" + }, + "alg": "ES256" + }, + "signature": "1rXiEmWnf9eL7m7Wy3K4l25-Zv2XXl5GgqhM_yjT0ujPmTn0uwfHcCWlweHa9gput3sECj507eQyGpBOF5rD6Q", + "protected": "eyJmb3JtYXRMZW5ndGgiOjQ4NSwiZm9ybWF0VGFpbCI6IkNuMCIsInRpbWUiOiIyMDE2LTA3LTI2VDExOjQ2OjQ2WiJ9" + } + ] +}` diff --git a/pkg/image/api/helper.go b/pkg/image/api/helper.go index 661fdac78e9d..31df54bcc5ae 100644 --- a/pkg/image/api/helper.go +++ b/pkg/image/api/helper.go @@ -436,14 +436,14 @@ func ImageConfigMatchesImage(image *Image, imageConfig []byte) (bool, error) { return v.Verified(), nil } -// ImageWithMetadata returns a copy of image with the DockerImageMetadata filled in -// from the raw DockerImageManifest data stored in the image. +// ImageWithMetadata mutates the given image. It parses raw DockerImageManifest data stored in the image and +// fills its DockerImageMetadata and other fields. func ImageWithMetadata(image *Image) error { if len(image.DockerImageManifest) == 0 { return nil } - if len(image.DockerImageLayers) > 0 && image.DockerImageMetadata.Size > 0 { + if len(image.DockerImageLayers) > 0 && image.DockerImageMetadata.Size > 0 && len(image.DockerImageManifestMediaType) > 0 { glog.V(5).Infof("Image metadata already filled for %s", image.Name) // don't update image already filled return nil @@ -460,6 +460,8 @@ func ImageWithMetadata(image *Image) error { case 0: // legacy config object case 1: + image.DockerImageManifestMediaType = schema1.MediaTypeManifest + if len(manifest.History) == 0 { // should never have an empty history, but just in case... return nil @@ -520,6 +522,8 @@ func ImageWithMetadata(image *Image) error { image.DockerImageMetadata.Size = v1Metadata.Size } case 2: + image.DockerImageManifestMediaType = schema2.MediaTypeManifest + config := DockerImageConfig{} if err := json.Unmarshal([]byte(image.DockerImageConfig), &config); err != nil { return err diff --git a/pkg/image/api/helper_test.go b/pkg/image/api/helper_test.go index 8f92c5ef8e32..e7b93fa88efc 100644 --- a/pkg/image/api/helper_test.go +++ b/pkg/image/api/helper_test.go @@ -638,7 +638,6 @@ func validImageWithManifestV2Data() Image { ObjectMeta: kapi.ObjectMeta{ Name: "id", }, - DockerImageManifestMediaType: "application/vnd.docker.container.image.v1+json", DockerImageConfig: `{ "architecture": "amd64", "config": { @@ -816,6 +815,7 @@ func TestImageWithMetadata(t *testing.T) { {Name: "tarsum.dev+sha256:b194de3772ebbcdc8f244f663669799ac1cb141834b7cb8b69100285d357a2b0", MediaType: "application/vnd.docker.container.image.rootfs.diff+x-gtar", LayerSize: 1895}, {Name: "tarsum.dev+sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", MediaType: "application/vnd.docker.container.image.rootfs.diff+x-gtar", LayerSize: 0}, }, + DockerImageManifestMediaType: "application/vnd.docker.distribution.manifest.v1+json", DockerImageMetadata: DockerImage{ ID: "2d24f826cb16146e2016ff349a8a33ed5830f3b938d45c0f82943f4ab8c097e7", Parent: "117ee323aaa9d1b136ea55e4421f4ce413dfc6c0cc6b2186dea6c88d93e1ad7c", @@ -888,7 +888,7 @@ func TestImageWithMetadata(t *testing.T) { }, DockerImageConfig: validImageWithManifestV2Data().DockerImageConfig, DockerImageManifest: validImageWithManifestV2Data().DockerImageManifest, - DockerImageManifestMediaType: "application/vnd.docker.container.image.v1+json", + DockerImageManifestMediaType: "application/vnd.docker.distribution.manifest.v2+json", DockerImageLayers: []ImageLayer{ {Name: "sha256:b4ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4", MediaType: "application/vnd.docker.image.rootfs.diff.tar.gzip", LayerSize: 639152}, {Name: "sha256:86e0e091d0da6bde2456dbb48306f3956bbeb2eae1b5b9a43045843f69fe4aaa", MediaType: "application/vnd.docker.image.rootfs.diff.tar.gzip", LayerSize: 235231}, diff --git a/test/end-to-end/core.sh b/test/end-to-end/core.sh index 1e29784a8c08..c54a604959c9 100755 --- a/test/end-to-end/core.sh +++ b/test/end-to-end/core.sh @@ -44,6 +44,14 @@ function wait_for_app() { os::cmd::try_until_text "curl -s http://${FRONTEND_IP}:5432/keys/foo" "1337" } +function remove_docker_images() { + local name="$1" + local tag="${2:-\S\+}" + local imageids=$(docker images | sed -n "s,^.*$name\s\+$tag\s\+\(\S\+\).*,\1,p" | sort -u | tr '\n' ' ') + os::cmd::expect_success_and_text "echo '${imageids}' | wc -w" '^[1-9][0-9]*$' + os::cmd::expect_success "docker rmi -f ${imageids}" +} + os::test::junit::declare_suite_start "end-to-end/core" echo "[INFO] openshift version: `openshift version`" @@ -128,56 +136,69 @@ os::cmd::expect_success "docker tag -f centos/ruby-22-centos7:latest ${DOCKER_RE os::cmd::expect_success "docker push ${DOCKER_REGISTRY}/cache/ruby-22-centos7:latest" echo "[INFO] Pushed ruby-22-centos7" +# get image's digest +rubyimagedigest=$(oc get -o jsonpath='{.status.tags[?(@.tag=="latest")].items[0].image}' is/ruby-22-centos7) +echo "[INFO] Ruby image digest: $rubyimagedigest" +# get a random, non-empty blob +rubyimageblob=$(oc get isimage -o go-template='{{range .image.dockerImageLayers}}{{if gt .size 1024.}}{{.name}},{{end}}{{end}}' ruby-22-centos7@${rubyimagedigest} | cut -d , -f 1) +echo "[INFO] Ruby's testing blob digest: $rubyimageblob" + # verify remote images can be pulled directly from the local registry echo "[INFO] Docker pullthrough" os::cmd::expect_success "oc import-image --confirm --from=mysql:latest mysql:pullthrough" os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/mysql:pullthrough" -echo "[INFO] Docker start with GCS" +echo "[INFO] Docker registry start with GCS" os::cmd::expect_failure_and_text "docker run -e REGISTRY_STORAGE=\"gcs: {}\" openshift/origin-docker-registry:${TAG}" "No bucket parameter provided" +echo "[INFO] Docker pull from istag" +os::cmd::expect_success "oc import-image --confirm --from=hello-world:latest -n test hello-world:pullthrough" +os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/test/hello-world:pullthrough" +os::cmd::expect_success "docker tag ${DOCKER_REGISTRY}/test/hello-world:pullthrough ${DOCKER_REGISTRY}/cache/hello-world:latest" +os::cmd::expect_success "docker push ${DOCKER_REGISTRY}/cache/hello-world:latest" + # verify we can pull from tagged image (using tag) -imageid=$(docker images | grep centos/ruby-22-centos7 | awk '{ print $3 }') -os::cmd::expect_success "docker rmi -f ${imageid}" -echo "[INFO] Tagging ruby-22-centos7:latest to the same image stream and pulling it" -os::cmd::expect_success "oc tag ruby-22-centos7:latest ruby-22-centos7:new-tag" -os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/ruby-22-centos7:new-tag" +remove_docker_images 'cache/hello-world' +echo "[INFO] Tagging hello-world:latest to the same image stream and pulling it" +os::cmd::expect_success "oc tag hello-world:latest hello-world:new-tag" +os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/hello-world:new-tag" echo "[INFO] The same image stream pull successful" -imageid=$(docker images | grep cache/ruby-22-centos7 | awk '{ print $3 }') -os::cmd::expect_success "docker rmi -f ${imageid}" -echo "[INFO] Tagging ruby-22-centos7:latest to cross repository and pulling it" -os::cmd::expect_success "oc tag ruby-22-centos7:latest cross:repo-pull" +remove_docker_images "${DOCKER_REGISTRY}/cache/hello-world" new-tag +echo "[INFO] Tagging hello-world:latest to cross repository and pulling it" +os::cmd::expect_success "oc tag hello-world:latest cross:repo-pull" os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/cross:repo-pull" echo "[INFO] Cross repository pull successful" -imageid=$(docker images | grep cache/cross | awk '{ print $3 }') -os::cmd::expect_success "docker rmi -f ${imageid}" -echo "[INFO] Tagging ruby-22-centos7:latest to cross namespace and pulling it" -os::cmd::expect_success "oc tag cache/ruby-22-centos7:latest cross:namespace-pull -n custom" +remove_docker_images "${DOCKER_REGISTRY}/cache/cross" "repo-pull" +echo "[INFO] Tagging hello-world:latest to cross namespace and pulling it" +os::cmd::expect_success "oc tag cache/hello-world:latest cross:namespace-pull -n custom" os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/custom/cross:namespace-pull" echo "[INFO] Cross namespace pull successful" -# verify we can pull from tagged image (using imageid) -imageid=$(docker images | grep custom/cross | awk '{ print $3 }') -os::cmd::expect_success "docker rmi -f ${imageid}" -tagid=$(oc get istag ruby-22-centos7:latest --template={{.image.metadata.name}}) -echo "[INFO] Tagging ruby-22-centos7@${tagid} to the same image stream and pulling it" -os::cmd::expect_success "oc tag ruby-22-centos7@${tagid} ruby-22-centos7:new-id-tag" -os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/ruby-22-centos7:new-id-tag" +# verify we can pull from tagged image (using image digest) +remove_docker_images "${DOCKER_REGISTRY}/custom/cross" namespace-pull +imagedigest=$(oc get istag hello-world:latest --template={{.image.metadata.name}}) +echo "[INFO] Tagging hello-world@${imagedigest} to the same image stream and pulling it" +os::cmd::expect_success "oc tag hello-world@${imagedigest} hello-world:new-id-tag" +os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/hello-world:new-id-tag" echo "[INFO] The same image stream pull successful" -imageid=$(docker images | grep cache/ruby-22-centos7 | awk '{ print $3 }') -os::cmd::expect_success "docker rmi -f ${imageid}" -echo "[INFO] Tagging ruby-22-centos7@${tagid} to cross repository and pulling it" -os::cmd::expect_success "oc tag ruby-22-centos7@${tagid} cross:repo-pull-id" +remove_docker_images "${DOCKER_REGISTRY}/cache/hello-world" new-id-tag +echo "[INFO] Tagging hello-world@${imagedigest} to cross repository and pulling it" +os::cmd::expect_success "oc tag hello-world@${imagedigest} cross:repo-pull-id" os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/cross:repo-pull-id" echo "[INFO] Cross repository pull successful" -imageid=$(docker images | grep cache/cross | awk '{ print $3 }') -os::cmd::expect_success "docker rmi -f ${imageid}" -echo "[INFO] Tagging ruby-22-centos7@${tagid} to cross namespace and pulling it" -os::cmd::expect_success "oc tag cache/ruby-22-centos7@${tagid} cross:namespace-pull-id -n custom" +remove_docker_images "${DOCKER_REGISTRY}/cache/cross" repo-pull-id +echo "[INFO] Tagging hello-world@${imagedigest} to cross repository and pulling it by id" +os::cmd::expect_success "oc tag hello-world@${imagedigest} cross:repo-pull-id" +os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/cross@${imagedigest}" +echo "[INFO] Cross repository pull successful" + +remove_docker_images "${DOCKER_REGISTRY}/cache/cross" +echo "[INFO] Tagging hello-world@${imagedigest} to cross namespace and pulling it" +os::cmd::expect_success "oc tag cache/hello-world@${imagedigest} cross:namespace-pull-id -n custom" os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/custom/cross:namespace-pull-id" echo "[INFO] Cross namespace pull successful" @@ -191,7 +212,7 @@ echo "[INFO] Docker login as pusher to ${DOCKER_REGISTRY}" os::cmd::expect_success "docker login -u e2e-user -p ${pusher_token} -e pusher@openshift.com ${DOCKER_REGISTRY}" echo "[INFO] Docker login successful" -# Test anonymous registry access +echo "[INFO] Anonymous registry access" # setup: log out of docker, log into openshift as e2e-user to run policy commands, tag image to use for push attempts os::cmd::expect_success 'oc login -u e2e-user' os::cmd::expect_success 'docker pull busybox' @@ -215,13 +236,39 @@ os::cmd::expect_success 'oc policy add-role-to-user system:image-pusher system:a os::cmd::try_until_text 'oc policy who-can update imagestreams/layers -n custom' 'system:anonymous' os::cmd::expect_success "docker push ${DOCKER_REGISTRY}/custom/cross:namespace-pull" os::cmd::expect_success "docker push ${DOCKER_REGISTRY}/custom/cross:namespace-pull-id" +echo "[INFO] Anonymous registry access successfull" # log back into docker as e2e-user again os::cmd::expect_success "docker login -u e2e-user -p ${e2e_user_token} -e e2e-user@openshift.com ${DOCKER_REGISTRY}" +os::cmd::expect_success "oc new-project crossmount" +os::cmd::expect_success "oc create imagestream repo" + echo "[INFO] Back to 'default' project with 'admin' user..." os::cmd::expect_success "oc project ${CLUSTER_ADMIN_CONTEXT}" os::cmd::expect_success_and_text 'oc whoami' 'system:admin' +os::cmd::expect_success "oc tag --source docker centos/ruby-22-centos7:latest -n custom ruby-22-centos7:latest" +os::cmd::expect_success 'oc policy add-role-to-user registry-viewer pusher -n custom' +os::cmd::expect_success 'oc policy add-role-to-user system:image-pusher pusher -n crossmount' + +echo "[INFO] Docker cross-repo mount" +os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/cache/ruby-22-centos7/blobs/$rubyimageblob'" "200 OK" +os::cmd::try_until_text "oc get -n custom is/ruby-22-centos7 -o 'jsonpath={.status.tags[*].tag}'" "latest" $((20*TIME_SEC)) +os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/custom/ruby-22-centos7/blobs/$rubyimageblob'" "200 OK" +os::cmd::try_until_text "oc policy can-i update imagestreams/layers -n crossmount '--token=${pusher_token}'" "yes" +os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/$rubyimageblob'" "404 Not Found" +# 202 means that cross-repo mount has failed (in this case because of blob doesn't exist in the source repository), client needs to reupload the blob +os::cmd::expect_success_and_text "curl -I -X POST -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/uploads/?mount=$rubyimageblob&from=cache/hello-world'" "202 Accepted" +# 201 means that blob has been cross mounted from given repository +os::cmd::expect_success_and_text "curl -I -X POST -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/uploads/?mount=$rubyimageblob&from=cache/ruby-22-centos7'" "201 Created" +# check that the blob is linked now +os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/$rubyimageblob'" "200 OK" +# remove pusher's permissions to read from the source repository +os::cmd::expect_success "oc policy remove-role-from-user system:image-pusher pusher -n cache" +os::cmd::try_until_text "oc policy can-i get imagestreams/layers -n cache '--token=${pusher_token}'" "no" +# cross-repo mount failed because of access denied +os::cmd::expect_success_and_text "curl -I -X POST -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/uploads/?mount=$rubyimageblob&from=cache/ruby-22-centos7'" "202 Accepted" +echo "[INFO] Docker cross-repo mount successful" # The build requires a dockercfg secret in the builder service account in order # to be able to push to the registry. Make sure it exists first. diff --git a/vendor/github.com/docker/distribution/blobs.go b/vendor/github.com/docker/distribution/blobs.go index d125330117f8..9788126eb398 100644 --- a/vendor/github.com/docker/distribution/blobs.go +++ b/vendor/github.com/docker/distribution/blobs.go @@ -192,6 +192,16 @@ type BlobCreateOption interface { Apply(interface{}) error } +// CreateOptions is a collection of blob creation modifiers relevant to general +// blob storage intended to be configured by the BlobCreateOption.Apply method. +type CreateOptions struct { + Mount struct { + ShouldMount bool + From reference.Canonical + Stat *Descriptor + } +} + // BlobWriter provides a handle for inserting data into a blob store. // Instances should be obtained from BlobWriteService.Writer and // BlobWriteService.Resume. If supported by the store, a writer can be diff --git a/vendor/github.com/docker/distribution/registry/client/repository.go b/vendor/github.com/docker/distribution/registry/client/repository.go index 8cc5f7f9aa03..0a724bb03eca 100644 --- a/vendor/github.com/docker/distribution/registry/client/repository.go +++ b/vendor/github.com/docker/distribution/registry/client/repository.go @@ -649,15 +649,6 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut return writer.Commit(ctx, desc) } -// createOptions is a collection of blob creation modifiers relevant to general -// blob storage intended to be configured by the BlobCreateOption.Apply method. -type createOptions struct { - Mount struct { - ShouldMount bool - From reference.Canonical - } -} - type optionFunc func(interface{}) error func (f optionFunc) Apply(v interface{}) error { @@ -668,7 +659,7 @@ func (f optionFunc) Apply(v interface{}) error { // mounted from the given canonical reference. func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption { return optionFunc(func(v interface{}) error { - opts, ok := v.(*createOptions) + opts, ok := v.(*distribution.CreateOptions) if !ok { return fmt.Errorf("unexpected options type: %T", v) } @@ -681,7 +672,7 @@ func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption { } func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { - var opts createOptions + var opts distribution.CreateOptions for _, option := range options { err := option.Apply(&opts) diff --git a/vendor/github.com/docker/distribution/registry/storage/linkedblobstore.go b/vendor/github.com/docker/distribution/registry/storage/linkedblobstore.go index 38b9e71aa281..5f720a3bb36e 100644 --- a/vendor/github.com/docker/distribution/registry/storage/linkedblobstore.go +++ b/vendor/github.com/docker/distribution/registry/storage/linkedblobstore.go @@ -101,15 +101,6 @@ func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) return desc, lbs.linkBlob(ctx, desc) } -// CreateOptions is a collection of blob creation modifiers relevant to general -// blob storage intended to be configured by the BlobCreateOption.Apply method. -type CreateOptions struct { - Mount struct { - ShouldMount bool - From reference.Canonical - } -} - type optionFunc func(interface{}) error func (f optionFunc) Apply(v interface{}) error { @@ -120,7 +111,7 @@ func (f optionFunc) Apply(v interface{}) error { // mounted from the given canonical reference. func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption { return optionFunc(func(v interface{}) error { - opts, ok := v.(*CreateOptions) + opts, ok := v.(*distribution.CreateOptions) if !ok { return fmt.Errorf("unexpected options type: %T", v) } @@ -136,7 +127,7 @@ func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption { func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { context.GetLogger(ctx).Debug("(*linkedBlobStore).Writer") - var opts CreateOptions + var opts distribution.CreateOptions for _, option := range options { err := option.Apply(&opts) @@ -146,7 +137,7 @@ func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution. } if opts.Mount.ShouldMount { - desc, err := lbs.mount(ctx, opts.Mount.From, opts.Mount.From.Digest()) + desc, err := lbs.mount(ctx, opts.Mount.From, opts.Mount.From.Digest(), opts.Mount.Stat) if err == nil { // Mount successful, no need to initiate an upload session return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc} @@ -289,14 +280,21 @@ func (lbs *linkedBlobStore) Enumerate(ctx context.Context, ingestor func(digest. return nil } -func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) { - repo, err := lbs.registry.Repository(ctx, sourceRepo) - if err != nil { - return distribution.Descriptor{}, err - } - stat, err := repo.Blobs(ctx).Stat(ctx, dgst) - if err != nil { - return distribution.Descriptor{}, err +func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest, sourceStat *distribution.Descriptor) (distribution.Descriptor, error) { + var stat distribution.Descriptor + if sourceStat == nil { + // look up the blob info from the sourceRepo if not already provided + repo, err := lbs.registry.Repository(ctx, sourceRepo) + if err != nil { + return distribution.Descriptor{}, err + } + stat, err = repo.Blobs(ctx).Stat(ctx, dgst) + if err != nil { + return distribution.Descriptor{}, err + } + } else { + // use the provided blob info + stat = *sourceStat } desc := distribution.Descriptor{