From 77d57e827672e2471873d03447f5f7066b7de0a2 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Thu, 19 Apr 2018 13:56:24 -0400 Subject: [PATCH 1/7] client/driver: add image pull progress monitoring --- client/driver/docker.go | 8 +- client/driver/docker_coordinator.go | 186 ++++++++++++++++++++++- client/driver/docker_coordinator_test.go | 10 +- 3 files changed, 193 insertions(+), 11 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index b0b23bea522..d868ae42916 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -234,6 +234,7 @@ type DockerDriverConfig struct { ReadonlyRootfs bool `mapstructure:"readonly_rootfs"` // Mount the container’s root filesystem as read only AdvertiseIPv6Address bool `mapstructure:"advertise_ipv6_address"` // Flag to use the GlobalIPv6Address from the container as the detected IP CPUHardLimit bool `mapstructure:"cpu_hard_limit"` // Enforce CPU hard limit. + ImagePullTimeout int64 `mapstructure:"image_pull_timeout"` // Timeout on the image pull after which the pull is cancelled } func sliceMergeUlimit(ulimitsRaw map[string]string) ([]docker.ULimit, error) { @@ -736,6 +737,9 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error { "cpu_hard_limit": { Type: fields.TypeBool, }, + "image_pull_timeout": { + Type: fields.TypeInt, + }, }, } @@ -765,6 +769,7 @@ func (d *DockerDriver) getDockerCoordinator(client *docker.Client) (*dockerCoord cleanup: d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault), logger: d.logger, removeDelay: d.config.ReadDurationDefault(dockerImageRemoveDelayConfigOption, dockerImageRemoveDelayConfigDefault), + emitEvent: d.emitEvent, } return GetDockerCoordinator(config), fmt.Sprintf("%s-%s", d.DriverContext.allocID, d.DriverContext.taskName) @@ -1546,7 +1551,8 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke d.emitEvent("Downloading image %s:%s", repo, tag) coordinator, callerID := d.getDockerCoordinator(client) - return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID) + + return coordinator.PullImage(driverConfig.ImageName, authOptions, time.Duration(driverConfig.ImagePullTimeout)*time.Second, callerID) } // authBackend encapsulates a function that resolves registry credentials. diff --git a/client/driver/docker_coordinator.go b/client/driver/docker_coordinator.go index ab034c45bbe..43491c89cc4 100644 --- a/client/driver/docker_coordinator.go +++ b/client/driver/docker_coordinator.go @@ -1,13 +1,17 @@ package driver import ( + "bytes" "context" + "encoding/json" "fmt" + "io" "log" "regexp" "sync" "time" + "github.com/docker/docker/pkg/jsonmessage" docker "github.com/fsouza/go-dockerclient" "github.com/hashicorp/nomad/nomad/structs" ) @@ -23,6 +27,14 @@ var ( // imageNotFoundMatcher is a regex expression that matches the image not // found error Docker returns. imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`) + + // defaultPullActivityDeadline is the default value set in the imageProgressManager + // when newImageProgressManager is called + defaultPullActivityDeadline = 2 * time.Minute + + // defaultImageProgressReportInterval is the default value set in the + // imageProgressManager when newImageProgressManager is called + defaultImageProgressReportInterval = 10 * time.Second ) // pullFuture is a sharable future for retrieving a pulled images ID and any @@ -84,6 +96,9 @@ type dockerCoordinatorConfig struct { // removeDelay is the delay between an image's reference count going to // zero and the image actually being deleted. removeDelay time.Duration + + //emitEvent us the function used to emit an event to a task + emitEvent LogEventFn } // dockerCoordinator is used to coordinate actions against images to prevent @@ -128,9 +143,135 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator { return globalCoordinator } +type imageProgress struct { + sync.RWMutex + lastMessage *jsonmessage.JSONMessage + timestamp time.Time +} + +func (p *imageProgress) get() (string, time.Time) { + p.RLock() + defer p.RUnlock() + + if p.lastMessage == nil { + return "No progress", p.timestamp + } + + var prefix string + if p.lastMessage.ID != "" { + prefix = fmt.Sprintf("%s:", p.lastMessage.ID) + } + + if p.lastMessage.Progress == nil { + return fmt.Sprintf("%s%s", prefix, p.lastMessage.Status), p.timestamp + } + + return fmt.Sprintf("%s%s %s", prefix, p.lastMessage.Status, p.lastMessage.Progress.String()), p.timestamp +} + +func (p *imageProgress) set(msg *jsonmessage.JSONMessage) { + p.Lock() + defer p.Unlock() + + p.lastMessage = msg + p.timestamp = time.Now() +} + +type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time) + +type imageProgressManager struct { + *imageProgress + image string + activityDeadline time.Duration + inactivityFunc progressReporterFunc + reportInterval time.Duration + reporter progressReporterFunc + cancel context.CancelFunc + stopCh chan struct{} + buf bytes.Buffer + pullStart time.Time +} + +func newImageProgressManager( + image string, cancel context.CancelFunc, + inactivityFunc, reporter progressReporterFunc) *imageProgressManager { + return &imageProgressManager{ + image: image, + activityDeadline: defaultPullActivityDeadline, + inactivityFunc: inactivityFunc, + reportInterval: defaultImageProgressReportInterval, + reporter: reporter, + imageProgress: &imageProgress{timestamp: time.Now()}, + cancel: cancel, + stopCh: make(chan struct{}), + } +} + +func (pm *imageProgressManager) withActivityDeadline(t time.Duration) *imageProgressManager { + pm.activityDeadline = t + return pm +} + +func (pm *imageProgressManager) withReportInterval(t time.Duration) *imageProgressManager { + pm.reportInterval = t + return pm +} + +func (pm *imageProgressManager) start() { + pm.pullStart = time.Now() + go func() { + ticker := time.NewTicker(defaultImageProgressReportInterval) + for { + select { + case <-ticker.C: + msg, timestamp := pm.get() + if time.Now().Sub(timestamp) > pm.activityDeadline { + pm.inactivityFunc(pm.image, msg, timestamp, pm.pullStart) + pm.cancel() + return + } + pm.reporter(pm.image, msg, timestamp, pm.pullStart) + case <-pm.stopCh: + return + } + } + }() +} + +func (pm *imageProgressManager) stop() { + close(pm.stopCh) +} + +func (pm *imageProgressManager) Write(p []byte) (n int, err error) { + n, err = pm.buf.Write(p) + + for { + line, err := pm.buf.ReadBytes('\n') + if err == io.EOF { + break + } + if err != nil { + return n, err + } + var msg jsonmessage.JSONMessage + err = json.Unmarshal(line, &msg) + if err != nil { + return n, err + } + + if msg.Error != nil { + return n, msg.Error + } + + pm.set(&msg) + } + + return +} + // PullImage is used to pull an image. It returns the pulled imaged ID or an // error that occurred during the pull -func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string) (imageID string, err error) { +func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, callerID string) (imageID string, err error) { // Get the future d.imageLock.Lock() future, ok := d.pullFutures[image] @@ -138,7 +279,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf // Make the future future = newPullFuture() d.pullFutures[image] = future - go d.pullImageImpl(image, authOptions, future) + go d.pullImageImpl(image, authOptions, pullTimeout, future) } d.imageLock.Unlock() @@ -165,15 +306,25 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf // pullImageImpl is the implementation of pulling an image. The results are // returned via the passed future -func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, future *pullFuture) { +func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, future *pullFuture) { // Parse the repo and tag repo, tag := docker.ParseRepositoryTag(image) if tag == "" { tag = "latest" } + ctx, cancel := context.WithCancel(context.Background()) + if pullTimeout > 0 { + ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(pullTimeout)) + } + defer cancel() + + pm := newImageProgressManager(image, cancel, d.handlePullInactivity, d.handlePullProgressReport) pullOptions := docker.PullImageOptions{ - Repository: repo, - Tag: tag, + Repository: repo, + Tag: tag, + OutputStream: pm, + RawJSONStream: true, + Context: ctx, } // Attempt to pull the image @@ -181,7 +332,17 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth if authOptions != nil { auth = *authOptions } + + pm.start() + defer pm.stop() err := d.client.PullImage(pullOptions, auth) + + if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded { + d.logger.Printf("[ERR] driver.docker: timeout pulling container %s:%s", repo, tag) + future.set("", recoverablePullError(ctxErr, image)) + return + } + if err != nil { d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err) future.set("", recoverablePullError(err, image)) @@ -337,6 +498,21 @@ func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) { d.imageLock.Unlock() } +func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time) { + d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg) + +} + +func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time) { + if timestamp.Sub(pullStart) > 10*time.Second { + d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg) + } + + if timestamp.Sub(pullStart) > 2*time.Minute { + d.emitEvent("Docker image %s pull progress: %s", image, msg) + } +} + // recoverablePullError wraps the error gotten when trying to pull and image if // the error is recoverable. func recoverablePullError(err error, image string) error { diff --git a/client/driver/docker_coordinator_test.go b/client/driver/docker_coordinator_test.go index 1df049b28e6..435dc1fb1f8 100644 --- a/client/driver/docker_coordinator_test.go +++ b/client/driver/docker_coordinator_test.go @@ -64,7 +64,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) { id := "" for i := 0; i < 10; i++ { go func() { - id, _ = coordinator.PullImage(image, nil, uuid.Generate()) + id, _ = coordinator.PullImage(image, nil, 0, uuid.Generate()) }() } @@ -112,7 +112,7 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) { callerIDs := make([]string, 10, 10) for i := 0; i < 10; i++ { callerIDs[i] = uuid.Generate() - id, _ = coordinator.PullImage(image, nil, callerIDs[i]) + id, _ = coordinator.PullImage(image, nil, 0, callerIDs[i]) } // Check the reference count @@ -173,7 +173,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { callerID := uuid.Generate() // Pull image - id, _ := coordinator.PullImage(image, nil, callerID) + id, _ := coordinator.PullImage(image, nil, 0, callerID) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 1 { @@ -189,7 +189,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { } // Pull image again within delay - id, _ = coordinator.PullImage(image, nil, callerID) + id, _ = coordinator.PullImage(image, nil, 0, callerID) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 1 { @@ -221,7 +221,7 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) { callerID := uuid.Generate() // Pull image - id, _ := coordinator.PullImage(image, nil, callerID) + id, _ := coordinator.PullImage(image, nil, 0, callerID) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 0 { From c44db4ce79cb78f5ec813243a2ec1f287829d83b Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Fri, 20 Apr 2018 00:45:32 -0400 Subject: [PATCH 2/7] client/driver: emit progress to all allocs pulling same image --- client/driver/docker.go | 3 +-- client/driver/docker_coordinator.go | 39 +++++++++++++++++++++++++---- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index d868ae42916..d32a4c0d86b 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -769,7 +769,6 @@ func (d *DockerDriver) getDockerCoordinator(client *docker.Client) (*dockerCoord cleanup: d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault), logger: d.logger, removeDelay: d.config.ReadDurationDefault(dockerImageRemoveDelayConfigOption, dockerImageRemoveDelayConfigDefault), - emitEvent: d.emitEvent, } return GetDockerCoordinator(config), fmt.Sprintf("%s-%s", d.DriverContext.allocID, d.DriverContext.taskName) @@ -1552,7 +1551,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke d.emitEvent("Downloading image %s:%s", repo, tag) coordinator, callerID := d.getDockerCoordinator(client) - return coordinator.PullImage(driverConfig.ImageName, authOptions, time.Duration(driverConfig.ImagePullTimeout)*time.Second, callerID) + return coordinator.PullImage(driverConfig.ImageName, authOptions, time.Duration(driverConfig.ImagePullTimeout)*time.Second, callerID, d.emitEvent) } // authBackend encapsulates a function that resolves registry credentials. diff --git a/client/driver/docker_coordinator.go b/client/driver/docker_coordinator.go index 43491c89cc4..a6af308a668 100644 --- a/client/driver/docker_coordinator.go +++ b/client/driver/docker_coordinator.go @@ -96,9 +96,6 @@ type dockerCoordinatorConfig struct { // removeDelay is the delay between an image's reference count going to // zero and the image actually being deleted. removeDelay time.Duration - - //emitEvent us the function used to emit an event to a task - emitEvent LogEventFn } // dockerCoordinator is used to coordinate actions against images to prevent @@ -113,6 +110,12 @@ type dockerCoordinator struct { // only have one request be sent to Docker pullFutures map[string]*pullFuture + // pullLoggers is used to track the LogEventFn for each alloc pulling an image + pullLoggers map[string][]LogEventFn + + // pullLoggerLock is used to sync access to the pullLoggers map + pullLoggerLock sync.RWMutex + // imageRefCount is the reference count of image IDs imageRefCount map[string]map[string]struct{} @@ -129,6 +132,7 @@ func NewDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator { return &dockerCoordinator{ dockerCoordinatorConfig: config, pullFutures: make(map[string]*pullFuture), + pullLoggers: make(map[string][]LogEventFn), imageRefCount: make(map[string]map[string]struct{}), deleteFuture: make(map[string]context.CancelFunc), } @@ -271,10 +275,11 @@ func (pm *imageProgressManager) Write(p []byte) (n int, err error) { // PullImage is used to pull an image. It returns the pulled imaged ID or an // error that occurred during the pull -func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, callerID string) (imageID string, err error) { +func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, callerID string, emitFn LogEventFn) (imageID string, err error) { // Get the future d.imageLock.Lock() future, ok := d.pullFutures[image] + d.registerPullLogger(image, emitFn) if !ok { // Make the future future = newPullFuture() @@ -307,6 +312,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf // pullImageImpl is the implementation of pulling an image. The results are // returned via the passed future func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, future *pullFuture) { + defer d.clearPullLogger(image) // Parse the repo and tag repo, tag := docker.ParseRepositoryTag(image) if tag == "" { @@ -498,6 +504,29 @@ func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) { d.imageLock.Unlock() } +func (d *dockerCoordinator) registerPullLogger(image string, logger LogEventFn) { + d.pullLoggerLock.Lock() + defer d.pullLoggerLock.Unlock() + if _, ok := d.pullLoggers[image]; !ok { + d.pullLoggers[image] = []LogEventFn{} + } + d.pullLoggers[image] = append(d.pullLoggers[image], logger) +} + +func (d *dockerCoordinator) clearPullLogger(image string) { + d.pullLoggerLock.Lock() + defer d.pullLoggerLock.Unlock() + delete(d.pullLoggers, image) +} + +func (d *dockerCoordinator) emitEvent(image, message string, args ...interface{}) { + d.pullLoggerLock.RLock() + defer d.pullLoggerLock.RUnlock() + for i := range d.pullLoggers[image] { + go d.pullLoggers[image][i](message, args...) + } +} + func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time) { d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg) @@ -509,7 +538,7 @@ func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestam } if timestamp.Sub(pullStart) > 2*time.Minute { - d.emitEvent("Docker image %s pull progress: %s", image, msg) + d.emitEvent(image, "Docker image %s pull progress: %s", image, msg) } } From 6fb6ecdff65728216a379eac7c08df388b89b1a6 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Fri, 20 Apr 2018 23:38:54 -0400 Subject: [PATCH 3/7] client/driver: do accounting on layer pull progress --- client/driver/docker.go | 13 +- client/driver/docker_coordinator.go | 152 +------------ client/driver/docker_coordinator_test.go | 10 +- client/driver/docker_progress.go | 259 +++++++++++++++++++++++ client/driver/docker_progress_test.go | 52 +++++ 5 files changed, 335 insertions(+), 151 deletions(-) create mode 100644 client/driver/docker_progress.go create mode 100644 client/driver/docker_progress_test.go diff --git a/client/driver/docker.go b/client/driver/docker.go index d32a4c0d86b..94fcae9c565 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -234,7 +234,8 @@ type DockerDriverConfig struct { ReadonlyRootfs bool `mapstructure:"readonly_rootfs"` // Mount the container’s root filesystem as read only AdvertiseIPv6Address bool `mapstructure:"advertise_ipv6_address"` // Flag to use the GlobalIPv6Address from the container as the detected IP CPUHardLimit bool `mapstructure:"cpu_hard_limit"` // Enforce CPU hard limit. - ImagePullTimeout int64 `mapstructure:"image_pull_timeout"` // Timeout on the image pull after which the pull is cancelled + ImagePullTimeoutRaw string `mapstructure:"image_pull_timeout"` // + ImagePullTimeout time.Duration `mapstructure:"-"` // Timeout on the image pull after which the pull is cancelled } func sliceMergeUlimit(ulimitsRaw map[string]string) ([]docker.ULimit, error) { @@ -304,6 +305,12 @@ func (c *DockerDriverConfig) Validate() error { return err } c.Ulimit = ulimit + if len(c.ImagePullTimeoutRaw) > 0 { + c.ImagePullTimeout, err = time.ParseDuration(c.ImagePullTimeoutRaw) + if err != nil { + return err + } + } return nil } @@ -738,7 +745,7 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error { Type: fields.TypeBool, }, "image_pull_timeout": { - Type: fields.TypeInt, + Type: fields.TypeString, }, }, } @@ -1551,7 +1558,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke d.emitEvent("Downloading image %s:%s", repo, tag) coordinator, callerID := d.getDockerCoordinator(client) - return coordinator.PullImage(driverConfig.ImageName, authOptions, time.Duration(driverConfig.ImagePullTimeout)*time.Second, callerID, d.emitEvent) + return coordinator.PullImage(driverConfig.ImageName, authOptions, driverConfig.ImagePullTimeout, callerID, d.emitEvent) } // authBackend encapsulates a function that resolves registry credentials. diff --git a/client/driver/docker_coordinator.go b/client/driver/docker_coordinator.go index a6af308a668..2a6dcfeae79 100644 --- a/client/driver/docker_coordinator.go +++ b/client/driver/docker_coordinator.go @@ -1,17 +1,13 @@ package driver import ( - "bytes" "context" - "encoding/json" "fmt" - "io" "log" "regexp" "sync" "time" - "github.com/docker/docker/pkg/jsonmessage" docker "github.com/fsouza/go-dockerclient" "github.com/hashicorp/nomad/nomad/structs" ) @@ -27,14 +23,12 @@ var ( // imageNotFoundMatcher is a regex expression that matches the image not // found error Docker returns. imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`) +) - // defaultPullActivityDeadline is the default value set in the imageProgressManager - // when newImageProgressManager is called - defaultPullActivityDeadline = 2 * time.Minute - - // defaultImageProgressReportInterval is the default value set in the - // imageProgressManager when newImageProgressManager is called - defaultImageProgressReportInterval = 10 * time.Second +const ( + // dockerPullProgressEmitInterval is the interval at which the pull progress + // is emitted to the allocation + dockerPullProgressEmitInterval = 2 * time.Minute ) // pullFuture is a sharable future for retrieving a pulled images ID and any @@ -147,132 +141,6 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator { return globalCoordinator } -type imageProgress struct { - sync.RWMutex - lastMessage *jsonmessage.JSONMessage - timestamp time.Time -} - -func (p *imageProgress) get() (string, time.Time) { - p.RLock() - defer p.RUnlock() - - if p.lastMessage == nil { - return "No progress", p.timestamp - } - - var prefix string - if p.lastMessage.ID != "" { - prefix = fmt.Sprintf("%s:", p.lastMessage.ID) - } - - if p.lastMessage.Progress == nil { - return fmt.Sprintf("%s%s", prefix, p.lastMessage.Status), p.timestamp - } - - return fmt.Sprintf("%s%s %s", prefix, p.lastMessage.Status, p.lastMessage.Progress.String()), p.timestamp -} - -func (p *imageProgress) set(msg *jsonmessage.JSONMessage) { - p.Lock() - defer p.Unlock() - - p.lastMessage = msg - p.timestamp = time.Now() -} - -type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time) - -type imageProgressManager struct { - *imageProgress - image string - activityDeadline time.Duration - inactivityFunc progressReporterFunc - reportInterval time.Duration - reporter progressReporterFunc - cancel context.CancelFunc - stopCh chan struct{} - buf bytes.Buffer - pullStart time.Time -} - -func newImageProgressManager( - image string, cancel context.CancelFunc, - inactivityFunc, reporter progressReporterFunc) *imageProgressManager { - return &imageProgressManager{ - image: image, - activityDeadline: defaultPullActivityDeadline, - inactivityFunc: inactivityFunc, - reportInterval: defaultImageProgressReportInterval, - reporter: reporter, - imageProgress: &imageProgress{timestamp: time.Now()}, - cancel: cancel, - stopCh: make(chan struct{}), - } -} - -func (pm *imageProgressManager) withActivityDeadline(t time.Duration) *imageProgressManager { - pm.activityDeadline = t - return pm -} - -func (pm *imageProgressManager) withReportInterval(t time.Duration) *imageProgressManager { - pm.reportInterval = t - return pm -} - -func (pm *imageProgressManager) start() { - pm.pullStart = time.Now() - go func() { - ticker := time.NewTicker(defaultImageProgressReportInterval) - for { - select { - case <-ticker.C: - msg, timestamp := pm.get() - if time.Now().Sub(timestamp) > pm.activityDeadline { - pm.inactivityFunc(pm.image, msg, timestamp, pm.pullStart) - pm.cancel() - return - } - pm.reporter(pm.image, msg, timestamp, pm.pullStart) - case <-pm.stopCh: - return - } - } - }() -} - -func (pm *imageProgressManager) stop() { - close(pm.stopCh) -} - -func (pm *imageProgressManager) Write(p []byte) (n int, err error) { - n, err = pm.buf.Write(p) - - for { - line, err := pm.buf.ReadBytes('\n') - if err == io.EOF { - break - } - if err != nil { - return n, err - } - var msg jsonmessage.JSONMessage - err = json.Unmarshal(line, &msg) - if err != nil { - return n, err - } - - if msg.Error != nil { - return n, msg.Error - } - - pm.set(&msg) - } - - return -} - // PullImage is used to pull an image. It returns the pulled imaged ID or an // error that occurred during the pull func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, callerID string, emitFn LogEventFn) (imageID string, err error) { @@ -319,10 +187,10 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth tag = "latest" } ctx, cancel := context.WithCancel(context.Background()) + defer cancel() if pullTimeout > 0 { - ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(pullTimeout)) + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(pullTimeout)) } - defer cancel() pm := newImageProgressManager(image, cancel, d.handlePullInactivity, d.handlePullProgressReport) pullOptions := docker.PullImageOptions{ @@ -533,11 +401,9 @@ func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, p } func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time) { - if timestamp.Sub(pullStart) > 10*time.Second { - d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg) - } + d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg) - if timestamp.Sub(pullStart) > 2*time.Minute { + if timestamp.Sub(pullStart) > dockerPullProgressEmitInterval { d.emitEvent(image, "Docker image %s pull progress: %s", image, msg) } } diff --git a/client/driver/docker_coordinator_test.go b/client/driver/docker_coordinator_test.go index 435dc1fb1f8..7feef1f18a2 100644 --- a/client/driver/docker_coordinator_test.go +++ b/client/driver/docker_coordinator_test.go @@ -64,7 +64,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) { id := "" for i := 0; i < 10; i++ { go func() { - id, _ = coordinator.PullImage(image, nil, 0, uuid.Generate()) + id, _ = coordinator.PullImage(image, nil, 0, uuid.Generate(), nil) }() } @@ -112,7 +112,7 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) { callerIDs := make([]string, 10, 10) for i := 0; i < 10; i++ { callerIDs[i] = uuid.Generate() - id, _ = coordinator.PullImage(image, nil, 0, callerIDs[i]) + id, _ = coordinator.PullImage(image, nil, 0, callerIDs[i], nil) } // Check the reference count @@ -173,7 +173,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { callerID := uuid.Generate() // Pull image - id, _ := coordinator.PullImage(image, nil, 0, callerID) + id, _ := coordinator.PullImage(image, nil, 0, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 1 { @@ -189,7 +189,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { } // Pull image again within delay - id, _ = coordinator.PullImage(image, nil, 0, callerID) + id, _ = coordinator.PullImage(image, nil, 0, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 1 { @@ -221,7 +221,7 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) { callerID := uuid.Generate() // Pull image - id, _ := coordinator.PullImage(image, nil, 0, callerID) + id, _ := coordinator.PullImage(image, nil, 0, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 0 { diff --git a/client/driver/docker_progress.go b/client/driver/docker_progress.go new file mode 100644 index 00000000000..c62ebf877f4 --- /dev/null +++ b/client/driver/docker_progress.go @@ -0,0 +1,259 @@ +package driver + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "sync" + "time" + + "github.com/docker/docker/pkg/jsonmessage" + units "github.com/docker/go-units" +) + +const ( + // defaultPullActivityDeadline is the default value set in the imageProgressManager + // when newImageProgressManager is called + defaultPullActivityDeadline = 2 * time.Minute + + // defaultImageProgressReportInterval is the default value set in the + // imageProgressManager when newImageProgressManager is called + defaultImageProgressReportInterval = 10 * time.Second +) + +// layerProgress tracks the state and downloaded bytes of a single layer within +// a docker image +type layerProgress struct { + id string + status layerProgressStatus + currentBytes int64 + totalBytes int64 +} + +type layerProgressStatus int + +const ( + layerProgressStatusUnknown layerProgressStatus = iota + layerProgressStatusStarting + layerProgressStatusWaiting + layerProgressStatusDownloading + layerProgressStatusVerifying + layerProgressStatusDownloaded + layerProgressStatusExtracting + layerProgressStatusComplete + layerProgressStatusExists +) + +func lpsFromString(status string) layerProgressStatus { + switch status { + case "Pulling fs layer": + return layerProgressStatusStarting + case "Waiting": + return layerProgressStatusWaiting + case "Downloading": + return layerProgressStatusDownloading + case "Verifying Checksum": + return layerProgressStatusVerifying + case "Download complete": + return layerProgressStatusDownloaded + case "Extracting": + return layerProgressStatusExtracting + case "Pull complete": + return layerProgressStatusComplete + case "Already exists": + return layerProgressStatusExists + default: + return layerProgressStatusUnknown + } +} + +// imageProgress tracks the status of each child layer as its pulled from a +// docker image repo +type imageProgress struct { + sync.RWMutex + lastMessage *jsonmessage.JSONMessage + timestamp time.Time + layers map[string]*layerProgress + pullStart time.Time +} + +// get returns a status message and the timestamp of the last status update +func (p *imageProgress) get() (string, time.Time) { + p.RLock() + defer p.RUnlock() + + if p.lastMessage == nil { + return "No progress", p.timestamp + } + + var pulled, pulling int + for _, l := range p.layers { + if l.status == layerProgressStatusDownloading { + pulling++ + } else if l.status > layerProgressStatusVerifying { + pulled++ + } + } + + elapsed := time.Now().Sub(p.pullStart) + cur := p.currentBytes() + total := p.totalBytes() + var est int64 + if cur != 0 { + est = (elapsed.Nanoseconds() / cur * total) - elapsed.Nanoseconds() + } + + return fmt.Sprintf("Pulled %d/%d (%s/%s) pulling %d layers - est %.1fs remaining", + pulled, len(p.layers), units.BytesSize(float64(cur)), units.BytesSize(float64(total)), pulling, + time.Duration(est).Seconds()), p.timestamp +} + +// set takes a status message received from the docker engine api during an image +// pull and updates the status of the coorisponding layer +func (p *imageProgress) set(msg *jsonmessage.JSONMessage) { + p.Lock() + defer p.Unlock() + + p.lastMessage = msg + p.timestamp = time.Now() + + lps := lpsFromString(msg.Status) + if lps == layerProgressStatusUnknown { + return + } + + layer, ok := p.layers[msg.ID] + if !ok { + layer = &layerProgress{id: msg.ID} + p.layers[msg.ID] = layer + } + layer.status = lps + if msg.Progress != nil && lps == layerProgressStatusDownloading { + layer.currentBytes = msg.Progress.Current + layer.totalBytes = msg.Progress.Total + } else if lps == layerProgressStatusDownloaded { + layer.currentBytes = layer.totalBytes + } +} + +// currentBytes iterates through all image layers and sums the total of +// current bytes. The caller is responsible for acquiring a read lock on the +// imageProgress struct +func (p *imageProgress) currentBytes() int64 { + var b int64 + for _, l := range p.layers { + b += l.currentBytes + } + return b +} + +// totalBytes iterates through all image layers and sums the total of +// total bytes. The caller is responsible for acquiring a read lock on the +// imageProgress struct +func (p *imageProgress) totalBytes() int64 { + var b int64 + for _, l := range p.layers { + b += l.totalBytes + } + return b +} + +// progressReporterFunc defines the method for handeling inactivity and report +// events from the imageProgressManager. The image name, current status message, +// timestamp of last received status update and timestamp of when the pull started +// are passed in. +type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time) + +// imageProgressManager tracks the progress of pulling a docker image from an +// image repository. +// It also implemented the io.Writer interface so as to be passed to the docker +// client pull image method in order to receive status updates from the docker +// engine api. +type imageProgressManager struct { + imageProgress *imageProgress + image string + activityDeadline time.Duration + inactivityFunc progressReporterFunc + reportInterval time.Duration + reporter progressReporterFunc + cancel context.CancelFunc + stopCh chan struct{} + buf bytes.Buffer +} + +func newImageProgressManager( + image string, cancel context.CancelFunc, + inactivityFunc, reporter progressReporterFunc) *imageProgressManager { + + return &imageProgressManager{ + image: image, + activityDeadline: defaultPullActivityDeadline, + inactivityFunc: inactivityFunc, + reportInterval: defaultImageProgressReportInterval, + reporter: reporter, + imageProgress: &imageProgress{ + timestamp: time.Now(), + layers: make(map[string]*layerProgress), + }, + cancel: cancel, + stopCh: make(chan struct{}), + } +} + +// start intiates the ticker to trigger the inactivity and reporter handlers +func (pm *imageProgressManager) start() { + pm.imageProgress.pullStart = time.Now() + go func() { + ticker := time.NewTicker(defaultImageProgressReportInterval) + for { + select { + case <-ticker.C: + msg, timestamp := pm.imageProgress.get() + if time.Now().Sub(timestamp) > pm.activityDeadline { + pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart) + pm.cancel() + return + } + pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart) + case <-pm.stopCh: + return + } + } + }() +} + +func (pm *imageProgressManager) stop() { + close(pm.stopCh) +} + +func (pm *imageProgressManager) Write(p []byte) (n int, err error) { + n, err = pm.buf.Write(p) + var msg jsonmessage.JSONMessage + + for { + line, err := pm.buf.ReadBytes('\n') + if err == io.EOF { + // Partial write of line; push back onto buffer and break until full line + pm.buf.Write(line) + break + } + if err != nil { + return n, err + } + err = json.Unmarshal(line, &msg) + if err != nil { + return n, err + } + + if msg.Error != nil { + // error received from the docker engine api + return n, msg.Error + } + + pm.imageProgress.set(&msg) + } + + return +} diff --git a/client/driver/docker_progress_test.go b/client/driver/docker_progress_test.go new file mode 100644 index 00000000000..2eb2e86569b --- /dev/null +++ b/client/driver/docker_progress_test.go @@ -0,0 +1,52 @@ +package driver + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_DockerImageProgressManager(t *testing.T) { + + pm := &imageProgressManager{ + imageProgress: &imageProgress{ + timestamp: time.Now(), + layers: make(map[string]*layerProgress), + }, + } + + _, err := pm.Write([]byte(`{"status":"Pulling from library/golang","id":"1.9.5"} +{"status":"Pulling fs layer","progressDetail":{},"id":"c73ab1c6897b"} +{"status":"Pulling fs layer","progressDetail":{},"id":"1ab373b3deae"} +`)) + require.NoError(t, err) + require.Equal(t, 2, len(pm.imageProgress.layers), "number of layers should be 2") + + cur := pm.imageProgress.currentBytes() + require.Zero(t, cur) + tot := pm.imageProgress.totalBytes() + require.Zero(t, tot) + + _, err = pm.Write([]byte(`{"status":"Pulling fs layer","progress`)) + require.NoError(t, err) + require.Equal(t, 2, len(pm.imageProgress.layers), "number of layers should be 2") + + _, err = pm.Write([]byte(`Detail":{},"id":"b542772b4177"}` + "\n")) + require.NoError(t, err) + require.Equal(t, 3, len(pm.imageProgress.layers), "number of layers should be 3") + + _, err = pm.Write([]byte(`{"status":"Downloading","progressDetail":{"current":45800,"total":4335495},"progress":"[\u003e ] 45.8kB/4.335MB","id":"b542772b4177"} +{"status":"Downloading","progressDetail":{"current":113576,"total":11108010},"progress":"[\u003e ] 113.6kB/11.11MB","id":"1ab373b3deae"} +{"status":"Downloading","progressDetail":{"current":694257,"total":4335495},"progress":"[========\u003e ] 694.3kB/4.335MB","id":"b542772b4177"}` + "\n")) + require.NoError(t, err) + require.Equal(t, 3, len(pm.imageProgress.layers), "number of layers should be 3") + require.Equal(t, int64(807833), pm.imageProgress.currentBytes()) + require.Equal(t, int64(15443505), pm.imageProgress.totalBytes()) + + _, err = pm.Write([]byte(`{"status":"Download complete","progressDetail":{},"id":"b542772b4177"}` + "\n")) + require.NoError(t, err) + require.Equal(t, 3, len(pm.imageProgress.layers), "number of layers should be 3") + require.Equal(t, int64(4449071), pm.imageProgress.currentBytes()) + require.Equal(t, int64(15443505), pm.imageProgress.totalBytes()) +} From 9ea3080343b912962342edd4267a7a870de0c427 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Wed, 25 Apr 2018 17:05:14 -0400 Subject: [PATCH 4/7] client/driver: remove pull timeout due to race condition that can lead to unexpected timeouts If two jobs are pulling the same image simultaneously, which ever starts the pull first will set the pull timeout. This can lead to a poor UX where the first job requested a short timeout while the second job requested a longer timeout causing the pull to potentially timeout much sooner than expected by the second job. --- client/driver/docker.go | 13 +--------- client/driver/docker_coordinator.go | 24 ++++++++--------- client/driver/docker_coordinator_test.go | 10 +++---- client/driver/docker_progress.go | 33 ++++++++++++++---------- 4 files changed, 36 insertions(+), 44 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 94fcae9c565..cc31e547eae 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -234,8 +234,6 @@ type DockerDriverConfig struct { ReadonlyRootfs bool `mapstructure:"readonly_rootfs"` // Mount the container’s root filesystem as read only AdvertiseIPv6Address bool `mapstructure:"advertise_ipv6_address"` // Flag to use the GlobalIPv6Address from the container as the detected IP CPUHardLimit bool `mapstructure:"cpu_hard_limit"` // Enforce CPU hard limit. - ImagePullTimeoutRaw string `mapstructure:"image_pull_timeout"` // - ImagePullTimeout time.Duration `mapstructure:"-"` // Timeout on the image pull after which the pull is cancelled } func sliceMergeUlimit(ulimitsRaw map[string]string) ([]docker.ULimit, error) { @@ -305,12 +303,6 @@ func (c *DockerDriverConfig) Validate() error { return err } c.Ulimit = ulimit - if len(c.ImagePullTimeoutRaw) > 0 { - c.ImagePullTimeout, err = time.ParseDuration(c.ImagePullTimeoutRaw) - if err != nil { - return err - } - } return nil } @@ -744,9 +736,6 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error { "cpu_hard_limit": { Type: fields.TypeBool, }, - "image_pull_timeout": { - Type: fields.TypeString, - }, }, } @@ -1558,7 +1547,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke d.emitEvent("Downloading image %s:%s", repo, tag) coordinator, callerID := d.getDockerCoordinator(client) - return coordinator.PullImage(driverConfig.ImageName, authOptions, driverConfig.ImagePullTimeout, callerID, d.emitEvent) + return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID, d.emitEvent) } // authBackend encapsulates a function that resolves registry credentials. diff --git a/client/driver/docker_coordinator.go b/client/driver/docker_coordinator.go index 2a6dcfeae79..011f806c346 100644 --- a/client/driver/docker_coordinator.go +++ b/client/driver/docker_coordinator.go @@ -104,7 +104,9 @@ type dockerCoordinator struct { // only have one request be sent to Docker pullFutures map[string]*pullFuture - // pullLoggers is used to track the LogEventFn for each alloc pulling an image + // pullLoggers is used to track the LogEventFn for each alloc pulling an image. + // If multiple alloc's are attempting to pull the same image, each will need + // to register its own LogEventFn with the coordinator. pullLoggers map[string][]LogEventFn // pullLoggerLock is used to sync access to the pullLoggers map @@ -143,7 +145,7 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator { // PullImage is used to pull an image. It returns the pulled imaged ID or an // error that occurred during the pull -func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, callerID string, emitFn LogEventFn) (imageID string, err error) { +func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string, emitFn LogEventFn) (imageID string, err error) { // Get the future d.imageLock.Lock() future, ok := d.pullFutures[image] @@ -152,7 +154,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf // Make the future future = newPullFuture() d.pullFutures[image] = future - go d.pullImageImpl(image, authOptions, pullTimeout, future) + go d.pullImageImpl(image, authOptions, future) } d.imageLock.Unlock() @@ -179,7 +181,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf // pullImageImpl is the implementation of pulling an image. The results are // returned via the passed future -func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, future *pullFuture) { +func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, future *pullFuture) { defer d.clearPullLogger(image) // Parse the repo and tag repo, tag := docker.ParseRepositoryTag(image) @@ -188,11 +190,10 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if pullTimeout > 0 { - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(pullTimeout)) - } pm := newImageProgressManager(image, cancel, d.handlePullInactivity, d.handlePullProgressReport) + defer pm.stop() + pullOptions := docker.PullImageOptions{ Repository: repo, Tag: tag, @@ -207,8 +208,6 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth auth = *authOptions } - pm.start() - defer pm.stop() err := d.client.PullImage(pullOptions, auth) if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded { @@ -395,15 +394,14 @@ func (d *dockerCoordinator) emitEvent(image, message string, args ...interface{} } } -func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time) { +func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time, interval int64) { d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg) - } -func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time) { +func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time, interval int64) { d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg) - if timestamp.Sub(pullStart) > dockerPullProgressEmitInterval { + if interval%int64(dockerPullProgressEmitInterval.Seconds()/dockerImageProgressReportInterval.Seconds()) == 0 { d.emitEvent(image, "Docker image %s pull progress: %s", image, msg) } } diff --git a/client/driver/docker_coordinator_test.go b/client/driver/docker_coordinator_test.go index 7feef1f18a2..c81cee99b9e 100644 --- a/client/driver/docker_coordinator_test.go +++ b/client/driver/docker_coordinator_test.go @@ -64,7 +64,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) { id := "" for i := 0; i < 10; i++ { go func() { - id, _ = coordinator.PullImage(image, nil, 0, uuid.Generate(), nil) + id, _ = coordinator.PullImage(image, nil, uuid.Generate(), nil) }() } @@ -112,7 +112,7 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) { callerIDs := make([]string, 10, 10) for i := 0; i < 10; i++ { callerIDs[i] = uuid.Generate() - id, _ = coordinator.PullImage(image, nil, 0, callerIDs[i], nil) + id, _ = coordinator.PullImage(image, nil, callerIDs[i], nil) } // Check the reference count @@ -173,7 +173,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { callerID := uuid.Generate() // Pull image - id, _ := coordinator.PullImage(image, nil, 0, callerID, nil) + id, _ := coordinator.PullImage(image, nil, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 1 { @@ -189,7 +189,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { } // Pull image again within delay - id, _ = coordinator.PullImage(image, nil, 0, callerID, nil) + id, _ = coordinator.PullImage(image, nil, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 1 { @@ -221,7 +221,7 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) { callerID := uuid.Generate() // Pull image - id, _ := coordinator.PullImage(image, nil, 0, callerID, nil) + id, _ := coordinator.PullImage(image, nil, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 0 { diff --git a/client/driver/docker_progress.go b/client/driver/docker_progress.go index c62ebf877f4..17098e71bc6 100644 --- a/client/driver/docker_progress.go +++ b/client/driver/docker_progress.go @@ -14,13 +14,13 @@ import ( ) const ( - // defaultPullActivityDeadline is the default value set in the imageProgressManager + // dockerPullActivityDeadline is the default value set in the imageProgressManager // when newImageProgressManager is called - defaultPullActivityDeadline = 2 * time.Minute + dockerPullActivityDeadline = 2 * time.Minute - // defaultImageProgressReportInterval is the default value set in the + // dockerImageProgressReportInterval is the default value set in the // imageProgressManager when newImageProgressManager is called - defaultImageProgressReportInterval = 10 * time.Second + dockerImageProgressReportInterval = 10 * time.Second ) // layerProgress tracks the state and downloaded bytes of a single layer within @@ -111,7 +111,7 @@ func (p *imageProgress) get() (string, time.Time) { } // set takes a status message received from the docker engine api during an image -// pull and updates the status of the coorisponding layer +// pull and updates the status of the corresponding layer func (p *imageProgress) set(msg *jsonmessage.JSONMessage) { p.Lock() defer p.Unlock() @@ -162,9 +162,9 @@ func (p *imageProgress) totalBytes() int64 { // progressReporterFunc defines the method for handeling inactivity and report // events from the imageProgressManager. The image name, current status message, -// timestamp of last received status update and timestamp of when the pull started -// are passed in. -type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time) +// timestamp of last received status update, timestamp of when the pull started +// and current report interation are passed in. +type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time, interval int64) // imageProgressManager tracks the progress of pulling a docker image from an // image repository. @@ -187,11 +187,11 @@ func newImageProgressManager( image string, cancel context.CancelFunc, inactivityFunc, reporter progressReporterFunc) *imageProgressManager { - return &imageProgressManager{ + pm := &imageProgressManager{ image: image, - activityDeadline: defaultPullActivityDeadline, + activityDeadline: dockerPullActivityDeadline, inactivityFunc: inactivityFunc, - reportInterval: defaultImageProgressReportInterval, + reportInterval: dockerImageProgressReportInterval, reporter: reporter, imageProgress: &imageProgress{ timestamp: time.Now(), @@ -200,23 +200,28 @@ func newImageProgressManager( cancel: cancel, stopCh: make(chan struct{}), } + + pm.start() + return pm } // start intiates the ticker to trigger the inactivity and reporter handlers func (pm *imageProgressManager) start() { pm.imageProgress.pullStart = time.Now() go func() { - ticker := time.NewTicker(defaultImageProgressReportInterval) + ticker := time.NewTicker(dockerImageProgressReportInterval) + var interval int64 for { + interval++ select { case <-ticker.C: msg, timestamp := pm.imageProgress.get() if time.Now().Sub(timestamp) > pm.activityDeadline { - pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart) + pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval) pm.cancel() return } - pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart) + pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval) case <-pm.stopCh: return } From 6337f8e32e39b8f3a8e0487478244002f06a39d2 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Thu, 3 May 2018 14:39:03 -0400 Subject: [PATCH 5/7] client/driver: add seperate handler for emitting pull progress --- client/driver/docker_coordinator.go | 13 ++++---- client/driver/docker_progress.go | 50 +++++++++++++++++------------ 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/client/driver/docker_coordinator.go b/client/driver/docker_coordinator.go index 011f806c346..47186062433 100644 --- a/client/driver/docker_coordinator.go +++ b/client/driver/docker_coordinator.go @@ -191,7 +191,8 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pm := newImageProgressManager(image, cancel, d.handlePullInactivity, d.handlePullProgressReport) + pm := newImageProgressManager(image, cancel, d.handlePullInactivity, + d.handlePullProgressReport, d.handleSlowPullProgressReport) defer pm.stop() pullOptions := docker.PullImageOptions{ @@ -394,16 +395,16 @@ func (d *dockerCoordinator) emitEvent(image, message string, args ...interface{} } } -func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time, interval int64) { +func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp time.Time) { d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg) } -func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time, interval int64) { +func (d *dockerCoordinator) handlePullProgressReport(image, msg string, _ time.Time) { d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg) +} - if interval%int64(dockerPullProgressEmitInterval.Seconds()/dockerImageProgressReportInterval.Seconds()) == 0 { - d.emitEvent(image, "Docker image %s pull progress: %s", image, msg) - } +func (d *dockerCoordinator) handleSlowPullProgressReport(image, msg string, _ time.Time) { + d.emitEvent(image, "Docker image %s pull progress: %s", image, msg) } // recoverablePullError wraps the error gotten when trying to pull and image if diff --git a/client/driver/docker_progress.go b/client/driver/docker_progress.go index 17098e71bc6..ed0da74ceca 100644 --- a/client/driver/docker_progress.go +++ b/client/driver/docker_progress.go @@ -161,10 +161,9 @@ func (p *imageProgress) totalBytes() int64 { } // progressReporterFunc defines the method for handeling inactivity and report -// events from the imageProgressManager. The image name, current status message, -// timestamp of last received status update, timestamp of when the pull started -// and current report interation are passed in. -type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time, interval int64) +// events from the imageProgressManager. The image name, current status message +// and timestamp of last received status update are passed in. +type progressReporterFunc func(image string, msg string, timestamp time.Time) // imageProgressManager tracks the progress of pulling a docker image from an // image repository. @@ -172,20 +171,23 @@ type progressReporterFunc func(image string, msg string, timestamp time.Time, pu // client pull image method in order to receive status updates from the docker // engine api. type imageProgressManager struct { - imageProgress *imageProgress - image string - activityDeadline time.Duration - inactivityFunc progressReporterFunc - reportInterval time.Duration - reporter progressReporterFunc - cancel context.CancelFunc - stopCh chan struct{} - buf bytes.Buffer + imageProgress *imageProgress + image string + activityDeadline time.Duration + inactivityFunc progressReporterFunc + reportInterval time.Duration + reporter progressReporterFunc + slowReportInterval time.Duration + slowReporter progressReporterFunc + lastSlowReport time.Time + cancel context.CancelFunc + stopCh chan struct{} + buf bytes.Buffer } func newImageProgressManager( image string, cancel context.CancelFunc, - inactivityFunc, reporter progressReporterFunc) *imageProgressManager { + inactivityFunc, reporter, slowReporter progressReporterFunc) *imageProgressManager { pm := &imageProgressManager{ image: image, @@ -193,6 +195,7 @@ func newImageProgressManager( inactivityFunc: inactivityFunc, reportInterval: dockerImageProgressReportInterval, reporter: reporter, + slowReporter: slowReporter, imageProgress: &imageProgress{ timestamp: time.Now(), layers: make(map[string]*layerProgress), @@ -207,21 +210,26 @@ func newImageProgressManager( // start intiates the ticker to trigger the inactivity and reporter handlers func (pm *imageProgressManager) start() { - pm.imageProgress.pullStart = time.Now() + now := time.Now() + pm.imageProgress.pullStart = now + pm.lastSlowReport = now go func() { ticker := time.NewTicker(dockerImageProgressReportInterval) - var interval int64 for { - interval++ select { case <-ticker.C: - msg, timestamp := pm.imageProgress.get() - if time.Now().Sub(timestamp) > pm.activityDeadline { - pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval) + msg, lastStatusTime := pm.imageProgress.get() + t := time.Now() + if t.Sub(lastStatusTime) > pm.activityDeadline { + pm.inactivityFunc(pm.image, msg, lastStatusTime) pm.cancel() return } - pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval) + if t.Sub(pm.lastSlowReport) > pm.slowReportInterval { + pm.slowReporter(pm.image, msg, lastStatusTime) + pm.lastSlowReport = t + } + pm.reporter(pm.image, msg, lastStatusTime) case <-pm.stopCh: return } From e324e86632ca16dd535218a5206580126fa1bec5 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Thu, 3 May 2018 15:14:53 -0400 Subject: [PATCH 6/7] client/driver: add waiting layer status count to pull progress status msg --- client/driver/docker_progress.go | 36 ++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/client/driver/docker_progress.go b/client/driver/docker_progress.go index ed0da74ceca..912c2d4a884 100644 --- a/client/driver/docker_progress.go +++ b/client/driver/docker_progress.go @@ -21,6 +21,10 @@ const ( // dockerImageProgressReportInterval is the default value set in the // imageProgressManager when newImageProgressManager is called dockerImageProgressReportInterval = 10 * time.Second + + // dockerImageSlowProgressReportInterval is the default value set in the + // imageProgressManager when newImageProgressManager is called + dockerImageSlowProgressReportInterval = 2 * time.Minute ) // layerProgress tracks the state and downloaded bytes of a single layer within @@ -88,11 +92,16 @@ func (p *imageProgress) get() (string, time.Time) { return "No progress", p.timestamp } - var pulled, pulling int + var pulled, pulling, waiting int for _, l := range p.layers { - if l.status == layerProgressStatusDownloading { + switch { + case l.status == layerProgressStatusStarting || + l.status == layerProgressStatusWaiting: + waiting++ + case l.status == layerProgressStatusDownloading || + l.status == layerProgressStatusVerifying: pulling++ - } else if l.status > layerProgressStatusVerifying { + case l.status >= layerProgressStatusDownloaded: pulled++ } } @@ -105,9 +114,9 @@ func (p *imageProgress) get() (string, time.Time) { est = (elapsed.Nanoseconds() / cur * total) - elapsed.Nanoseconds() } - return fmt.Sprintf("Pulled %d/%d (%s/%s) pulling %d layers - est %.1fs remaining", - pulled, len(p.layers), units.BytesSize(float64(cur)), units.BytesSize(float64(total)), pulling, - time.Duration(est).Seconds()), p.timestamp + return fmt.Sprintf("Pulled %d/%d (%s/%s) layers: %d waiting/%d pulling - est %.1fs remaining", + pulled, len(p.layers), units.BytesSize(float64(cur)), units.BytesSize(float64(total)), + waiting, pulling, time.Duration(est).Seconds()), p.timestamp } // set takes a status message received from the docker engine api during an image @@ -160,7 +169,7 @@ func (p *imageProgress) totalBytes() int64 { return b } -// progressReporterFunc defines the method for handeling inactivity and report +// progressReporterFunc defines the method for handling inactivity and report // events from the imageProgressManager. The image name, current status message // and timestamp of last received status update are passed in. type progressReporterFunc func(image string, msg string, timestamp time.Time) @@ -190,12 +199,13 @@ func newImageProgressManager( inactivityFunc, reporter, slowReporter progressReporterFunc) *imageProgressManager { pm := &imageProgressManager{ - image: image, - activityDeadline: dockerPullActivityDeadline, - inactivityFunc: inactivityFunc, - reportInterval: dockerImageProgressReportInterval, - reporter: reporter, - slowReporter: slowReporter, + image: image, + activityDeadline: dockerPullActivityDeadline, + inactivityFunc: inactivityFunc, + reportInterval: dockerImageProgressReportInterval, + reporter: reporter, + slowReportInterval: dockerImageSlowProgressReportInterval, + slowReporter: slowReporter, imageProgress: &imageProgress{ timestamp: time.Now(), layers: make(map[string]*layerProgress), From d442a444e076e7d0fcdc81271bab800f00729696 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Thu, 3 May 2018 15:30:57 -0400 Subject: [PATCH 7/7] changelog: add changelog entry for docker image pull progress monitoring --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e5a03067de0..16e5a54d42d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ IMPROVEMENTS: * command: Add -short option to init command that emits a minimal jobspec [[GH-4239](https://github.com/hashicorp/nomad/issues/4239)] * discovery: Support Consul gRPC health checks. [[GH-4251](https://github.com/hashicorp/nomad/issues/4251)] + * driver/docker: Add progress monitoring and inactivity detection to docker + image pulls [[GH-4192](https://github.com/hashicorp/nomad/issues/4192)] ## 0.8.3 (April 27, 2018)