diff --git a/integration/build_local_containerd_helper_test.go b/integration/build_local_containerd_helper_test.go index 1df6051065c5..1cbdd090541e 100644 --- a/integration/build_local_containerd_helper_test.go +++ b/integration/build_local_containerd_helper_test.go @@ -21,14 +21,17 @@ import ( "path/filepath" "sync" "testing" + "time" "github.com/containerd/containerd" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/log/logtest" "github.com/containerd/containerd/pkg/cri/constants" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/plugin" ctrdsrv "github.com/containerd/containerd/services/server" srvconfig "github.com/containerd/containerd/services/server/config" + "github.com/opencontainers/go-digest" _ "github.com/containerd/containerd/diff/walking/plugin" "github.com/containerd/containerd/events/exchange" @@ -59,9 +62,11 @@ var ( loadedPluginsErr error ) +type tweakPluginInitFunc func(t *testing.T, p *plugin.Registration) *plugin.Registration + // buildLocalContainerdClient is to return containerd client with initialized // core plugins in local. -func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client { +func buildLocalContainerdClient(t *testing.T, tmpDir string, tweakInitFn tweakPluginInitFunc) *containerd.Client { ctx := logtest.WithT(context.Background(), t) // load plugins @@ -107,6 +112,10 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client initContext.Config = pc } + if tweakInitFn != nil { + p = tweakInitFn(t, p) + } + result := p.Init(initContext) assert.NoError(t, initialized.Add(result)) @@ -126,3 +135,61 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client return client } + +func tweakContentInitFnWithDelayer(commitDelayDuration time.Duration) tweakPluginInitFunc { + return func(t *testing.T, p *plugin.Registration) *plugin.Registration { + if p.URI() != "io.containerd.content.v1.content" { + return p + } + + oldInitFn := p.InitFn + p.InitFn = func(ic *plugin.InitContext) (interface{}, error) { + instance, err := oldInitFn(ic) + if err != nil { + return nil, err + } + + return &contentStoreDelayer{ + t: t, + + Store: instance.(content.Store), + commitDelayDuration: commitDelayDuration, + }, nil + } + return p + } +} + +type contentStoreDelayer struct { + t *testing.T + + content.Store + commitDelayDuration time.Duration +} + +func (cs *contentStoreDelayer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { + w, err := cs.Store.Writer(ctx, opts...) + if err != nil { + return nil, err + } + + return &contentWriterDelayer{ + t: cs.t, + + Writer: w, + commitDelayDuration: cs.commitDelayDuration, + }, nil +} + +type contentWriterDelayer struct { + t *testing.T + + content.Writer + commitDelayDuration time.Duration +} + +func (w *contentWriterDelayer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + w.t.Logf("[testcase: %s] Commit %v blob after %v", w.t.Name(), expected, w.commitDelayDuration) + time.Sleep(w.commitDelayDuration) + return w.Writer.Commit(ctx, size, expected, opts...) +} diff --git a/integration/image_pull_timeout_test.go b/integration/image_pull_timeout_test.go index eb74d7d0c94f..994f04814a39 100644 --- a/integration/image_pull_timeout_test.go +++ b/integration/image_pull_timeout_test.go @@ -62,6 +62,41 @@ func TestCRIImagePullTimeout(t *testing.T) { t.Run("HoldingContentOpenWriter", testCRIImagePullTimeoutByHoldingContentOpenWriter) t.Run("NoDataTransferred", testCRIImagePullTimeoutByNoDataTransferred) + t.Run("SlowCommitWriter", testCRIImagePullTimeoutBySlowCommitWriter) +} + +// testCRIImagePullTimeoutBySlowCommitWriter tests that +// +// It should not cancel if the content.Commit takes long time. +// +// After copying all the data from registry, the request should be inactive +// before content.Commit. If the blob is large, for instance, 2 GiB, the fsync +// during content.Commit maybe take long time during IO pressure. The +// content.Commit holds the bolt's writable mutex and blocks other goroutines +// which are going to commit blob as well. If the progress tracker still +// considers these requests active, it maybe file false alert and cancel the +// ImagePull. +// +// It's reproducer for #9347. +func testCRIImagePullTimeoutBySlowCommitWriter(t *testing.T) { + t.Parallel() + + tmpDir := t.TempDir() + + delayDuration := 2 * defaultImagePullProgressTimeout + cli := buildLocalContainerdClient(t, tmpDir, tweakContentInitFnWithDelayer(delayDuration)) + + criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{}) + assert.NoError(t, err) + + ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace) + + _, err = criService.PullImage(ctx, &runtimeapi.PullImageRequest{ + Image: &runtimeapi.ImageSpec{ + Image: pullProgressTestImageName, + }, + }) + assert.NoError(t, err) } // testCRIImagePullTimeoutByHoldingContentOpenWriter tests that @@ -76,7 +111,7 @@ func testCRIImagePullTimeoutByHoldingContentOpenWriter(t *testing.T) { tmpDir := t.TempDir() - cli := buildLocalContainerdClient(t, tmpDir) + cli := buildLocalContainerdClient(t, tmpDir, nil) criService, err := initLocalCRIPlugin(cli, tmpDir, criconfig.Registry{}) assert.NoError(t, err) @@ -214,7 +249,7 @@ func testCRIImagePullTimeoutByNoDataTransferred(t *testing.T) { tmpDir := t.TempDir() - cli := buildLocalContainerdClient(t, tmpDir) + cli := buildLocalContainerdClient(t, tmpDir, nil) mirrorSrv := newMirrorRegistryServer(mirrorRegistryServerConfig{ limitedBytesPerConn: 1024 * 1024 * 3, // 3MB diff --git a/pkg/cri/config/config.go b/pkg/cri/config/config.go index 493eccfb79bc..2cbcf9b353cb 100644 --- a/pkg/cri/config/config.go +++ b/pkg/cri/config/config.go @@ -28,6 +28,30 @@ import ( "github.com/containerd/containerd/plugin" ) +const ( + // defaultImagePullProgressTimeoutDuration is the default value of imagePullProgressTimeout. + // + // NOTE: + // + // This ImagePullProgressTimeout feature is ported from kubelet/dockershim's + // --image-pull-progress-deadline. The original value is 1m0. Unlike docker + // daemon, the containerd doesn't have global concurrent download limitation + // before migrating to Transfer Service. If kubelet runs with concurrent + // image pull, the node will run under IO pressure. The ImagePull process + // could be impacted by self, if the target image is large one with a + // lot of layers. And also both container's writable layers and image's storage + // share one disk. The ImagePull process commits blob to content store + // with fsync, which might bring the unrelated files' dirty pages into + // disk in one transaction [1]. The 1m0 value isn't good enough. Based + // on #9347 case and kubernetes community's usage [2], the default value + // is updated to 5m0. If end-user still runs into unexpected cancel, + // they need to config it based on their environment. + // + // [1]: Fast commits for ext4 - https://lwn.net/Articles/842385/ + // [2]: https://github.com/kubernetes/kubernetes/blob/1635c380b26a1d8cc25d36e9feace9797f4bae3c/cluster/gce/util.sh#L882 + defaultImagePullProgressTimeoutDuration = 5 * time.Minute +) + type SandboxControllerMode string const ( diff --git a/pkg/cri/config/config_unix.go b/pkg/cri/config/config_unix.go index 72348ca3fc12..3eb8e9b94983 100644 --- a/pkg/cri/config/config_unix.go +++ b/pkg/cri/config/config_unix.go @@ -19,8 +19,6 @@ package config import ( - "time" - "github.com/containerd/containerd" "github.com/containerd/containerd/pkg/cri/streaming" "github.com/pelletier/go-toml" @@ -109,7 +107,7 @@ func DefaultConfig() PluginConfig { }, EnableCDI: false, CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"}, - ImagePullProgressTimeout: time.Minute.String(), + ImagePullProgressTimeout: defaultImagePullProgressTimeoutDuration.String(), DrainExecSyncIOTimeout: "0s", } } diff --git a/pkg/cri/config/config_windows.go b/pkg/cri/config/config_windows.go index 63bb2decdf87..d3ba9a3983e1 100644 --- a/pkg/cri/config/config_windows.go +++ b/pkg/cri/config/config_windows.go @@ -19,7 +19,6 @@ package config import ( "os" "path/filepath" - "time" "github.com/containerd/containerd" "github.com/containerd/containerd/pkg/cri/streaming" @@ -85,7 +84,7 @@ func DefaultConfig() PluginConfig { ImageDecryption: ImageDecryption{ KeyModel: KeyModelNode, }, - ImagePullProgressTimeout: time.Minute.String(), + ImagePullProgressTimeout: defaultImagePullProgressTimeoutDuration.String(), DrainExecSyncIOTimeout: "0s", } } diff --git a/pkg/cri/server/image_pull.go b/pkg/cri/server/image_pull.go index 86ba05b2cb25..3e8d5d2bb382 100644 --- a/pkg/cri/server/image_pull.go +++ b/pkg/cri/server/image_pull.go @@ -576,9 +576,6 @@ func (c *criService) encryptedImagesPullOpts() []containerd.RemoteOpt { } const ( - // minPullProgressReportInternal is used to prevent the reporter from - // eating more CPU resources - minPullProgressReportInternal = 5 * time.Second // defaultPullProgressReportInterval represents that how often the // reporter checks that pull progress. defaultPullProgressReportInterval = 10 * time.Second @@ -626,10 +623,6 @@ func (reporter *pullProgressReporter) start(ctx context.Context) { // check progress more frequently if timeout < default internal if reporter.timeout < reportInterval { reportInterval = reporter.timeout / 2 - - if reportInterval < minPullProgressReportInternal { - reportInterval = minPullProgressReportInternal - } } var ticker = time.NewTicker(reportInterval) @@ -644,9 +637,9 @@ func (reporter *pullProgressReporter) start(ctx context.Context) { WithField("activeReqs", activeReqs). WithField("totalBytesRead", bytesRead). WithField("lastSeenBytesRead", lastSeenBytesRead). - WithField("lastSeenTimestamp", lastSeenTimestamp). + WithField("lastSeenTimestamp", lastSeenTimestamp.Format(time.RFC3339)). WithField("reportInterval", reportInterval). - Tracef("progress for image pull") + Debugf("progress for image pull") if activeReqs == 0 || bytesRead > lastSeenBytesRead { lastSeenBytesRead = bytesRead diff --git a/remotes/docker/httpreadseeker.go b/remotes/docker/httpreadseeker.go index 9a827ef04c25..82435933906d 100644 --- a/remotes/docker/httpreadseeker.go +++ b/remotes/docker/httpreadseeker.go @@ -76,6 +76,16 @@ func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { if _, err2 := hrs.reader(); err2 == nil { return n, nil } + } else if err == io.EOF { + // The CRI's imagePullProgressTimeout relies on responseBody.Close to + // update the process monitor's status. If the err is io.EOF, close + // the connection since there is no more available data. + if hrs.rc != nil { + if clsErr := hrs.rc.Close(); clsErr != nil { + log.L.WithError(clsErr).Error("httpReadSeeker: failed to close ReadCloser after io.EOF") + } + hrs.rc = nil + } } return }