Skip to content

Commit

Permalink
Merge pull request #11484 from mfojtik/set-deploy-hooks-timeout
Browse files Browse the repository at this point in the history
Merged by openshift-bot
  • Loading branch information
OpenShift Bot authored Oct 25, 2016
2 parents 1197998 + 95fafba commit 30a00dc
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 120 deletions.
113 changes: 48 additions & 65 deletions pkg/deploy/strategy/support/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,39 +35,38 @@ const HookContainerName = "lifecycle"

// HookExecutor executes a deployment lifecycle hook.
type HookExecutor struct {
// podClient provides access to pods.
podClient HookExecutorPodClient
// pods provides client to pods
pods kclient.PodsNamespacer
// tags allows setting image stream tags
tags client.ImageStreamTagsNamespacer
// out is where hook pod logs should be written to.
out io.Writer
// podLogStream provides a reader for a pod's logs.
podLogStream func(namespace, name string, opts *kapi.PodLogOptions) (io.ReadCloser, error)
// decoder is used for encoding/decoding.
decoder runtime.Decoder
// recorder is used to emit events from hooks
events kclient.EventNamespacer
// getPodLogs knows how to get logs from a pod and is used for testing
getPodLogs func(*kapi.Pod) (io.ReadCloser, error)
}

// NewHookExecutor makes a HookExecutor from a client.
func NewHookExecutor(client kclient.PodsNamespacer, tags client.ImageStreamTagsNamespacer, events kclient.EventNamespacer, out io.Writer, decoder runtime.Decoder) *HookExecutor {
return &HookExecutor{
tags: tags,
events: events,
podClient: &HookExecutorPodClientImpl{
CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
return client.Pods(namespace).Create(pod)
},
PodWatchFunc: func(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
return NewPodWatch(client, namespace, name, resourceVersion, stopChannel)
},
},
podLogStream: func(namespace, name string, opts *kapi.PodLogOptions) (io.ReadCloser, error) {
return client.Pods(namespace).GetLogs(name, opts).Stream()
},
func NewHookExecutor(pods kclient.PodsNamespacer, tags client.ImageStreamTagsNamespacer, events kclient.EventNamespacer, out io.Writer, decoder runtime.Decoder) *HookExecutor {
executor := &HookExecutor{
tags: tags,
pods: pods,
events: events,
out: out,
decoder: decoder,
}
executor.getPodLogs = func(pod *kapi.Pod) (io.ReadCloser, error) {
opts := &kapi.PodLogOptions{
Container: HookContainerName,
Follow: true,
Timestamps: false,
}
return executor.pods.Pods(pod.Namespace).GetLogs(pod.Name, opts).Stream()
}
return executor
}

// Execute executes hook in the context of deployment. The suffix is used to
Expand All @@ -83,26 +82,30 @@ func (e *HookExecutor) Execute(hook *deployapi.LifecycleHook, deployment *kapi.R
tagEventMessages = append(tagEventMessages, fmt.Sprintf("image %q as %q", image, t.To.Name))
}
}
strategyutil.RecordConfigEvent(e.events, deployment, e.decoder, kapi.EventTypeNormal, "Started", fmt.Sprintf("Running %s-hook (TagImages) %s for deployment %s/%s", label, strings.Join(tagEventMessages, ","), deployment.Namespace, deployment.Name))
strategyutil.RecordConfigEvent(e.events, deployment, e.decoder, kapi.EventTypeNormal, "Started",
fmt.Sprintf("Running %s-hook (TagImages) %s for deployment %s/%s", label, strings.Join(tagEventMessages, ","), deployment.Namespace, deployment.Name))
err = e.tagImages(hook, deployment, suffix, label)
case hook.ExecNewPod != nil:
strategyutil.RecordConfigEvent(e.events, deployment, e.decoder, kapi.EventTypeNormal, "Started", fmt.Sprintf("Running %s-hook (%q) for deployment %s/%s", label, strings.Join(hook.ExecNewPod.Command, " "), deployment.Namespace, deployment.Name))
strategyutil.RecordConfigEvent(e.events, deployment, e.decoder, kapi.EventTypeNormal, "Started",
fmt.Sprintf("Running %s-hook (%q) for deployment %s/%s", label, strings.Join(hook.ExecNewPod.Command, " "), deployment.Namespace, deployment.Name))
err = e.executeExecNewPod(hook, deployment, suffix, label)
}

if err == nil {
strategyutil.RecordConfigEvent(e.events, deployment, e.decoder, kapi.EventTypeNormal, "Completed", fmt.Sprintf("The %s-hook for deployment %s/%s completed successfully", label, deployment.Namespace, deployment.Name))
strategyutil.RecordConfigEvent(e.events, deployment, e.decoder, kapi.EventTypeNormal, "Completed",
fmt.Sprintf("The %s-hook for deployment %s/%s completed successfully", label, deployment.Namespace, deployment.Name))
return nil
}

// Retry failures are treated the same as Abort.
switch hook.FailurePolicy {
case deployapi.LifecycleHookFailurePolicyAbort, deployapi.LifecycleHookFailurePolicyRetry:
strategyutil.RecordConfigEvent(e.events, deployment, e.decoder, kapi.EventTypeWarning, "Failed", fmt.Sprintf("The %s-hook failed: %v, aborting deployment %s/%s", label, err, deployment.Namespace, deployment.Name))
strategyutil.RecordConfigEvent(e.events, deployment, e.decoder, kapi.EventTypeWarning, "Failed",
fmt.Sprintf("The %s-hook failed: %v, aborting deployment %s/%s", label, err, deployment.Namespace, deployment.Name))
return fmt.Errorf("the %s hook failed: %v, aborting deployment: %s/%s", label, err, deployment.Namespace, deployment.Name)
case deployapi.LifecycleHookFailurePolicyIgnore:
strategyutil.RecordConfigEvent(e.events, deployment, e.decoder, kapi.EventTypeWarning, "Failed", fmt.Sprintf("The %s-hook failed: %v (ignore), deployment %s/%s will continue", label, err, deployment.Namespace, deployment.Name))
fmt.Fprintf(e.out, "the %s hook failed: %v (ignore), deployment %s/%s will continue", label, err, deployment.Namespace, deployment.Name)
strategyutil.RecordConfigEvent(e.events, deployment, e.decoder, kapi.EventTypeWarning, "Failed",
fmt.Sprintf("The %s-hook failed: %v (ignore), deployment %s/%s will continue", label, err, deployment.Namespace, deployment.Name))
return nil
default:
return err
Expand Down Expand Up @@ -170,8 +173,13 @@ func (e *HookExecutor) executeExecNewPod(hook *deployapi.LifecycleHook, deployme
return err
}

deployerPod, err := e.pods.Pods(deployment.Namespace).Get(deployutil.DeployerPodNameForDeployment(deployment.Name))
if err != nil {
return err
}

// Build a pod spec from the hook config and deployment
podSpec, err := makeHookPod(hook, deployment, &config.Spec.Strategy, suffix)
podSpec, err := makeHookPod(hook, deployment, deployerPod, &config.Spec.Strategy, suffix)
if err != nil {
return err
}
Expand All @@ -181,7 +189,7 @@ func (e *HookExecutor) executeExecNewPod(hook *deployapi.LifecycleHook, deployme
completed, created := false, false

// Try to create the pod.
pod, err := e.podClient.CreatePod(deployment.Namespace, podSpec)
pod, err := e.pods.Pods(deployment.Namespace).Create(podSpec)
if err != nil {
if !kerrors.IsAlreadyExists(err) {
return fmt.Errorf("couldn't create lifecycle pod for %s: %v", deployment.Name, err)
Expand All @@ -196,7 +204,7 @@ func (e *HookExecutor) executeExecNewPod(hook *deployapi.LifecycleHook, deployme

stopChannel := make(chan struct{})
defer close(stopChannel)
nextPod := e.podClient.PodWatch(pod.Namespace, pod.Name, pod.ResourceVersion, stopChannel)
nextPod := NewPodWatch(e.pods.Pods(pod.Namespace), pod.Namespace, pod.Name, pod.ResourceVersion, stopChannel)

// Wait for the hook pod to reach a terminal phase. Start reading logs as
// soon as the pod enters a usable phase.
Expand Down Expand Up @@ -264,12 +272,7 @@ waitLoop:
// done.
func (e *HookExecutor) readPodLogs(pod *kapi.Pod, wg *sync.WaitGroup) {
defer wg.Done()
opts := &kapi.PodLogOptions{
Container: HookContainerName,
Follow: true,
Timestamps: false,
}
logStream, err := e.podLogStream(pod.Namespace, pod.Name, opts)
logStream, err := e.getPodLogs(pod)
if err != nil || logStream == nil {
fmt.Fprintf(e.out, "warning: Unable to retrieve hook logs from %s: %v\n", pod.Name, err)
return
Expand All @@ -282,7 +285,7 @@ func (e *HookExecutor) readPodLogs(pod *kapi.Pod, wg *sync.WaitGroup) {
}

// makeHookPod makes a pod spec from a hook and deployment.
func makeHookPod(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, strategy *deployapi.DeploymentStrategy, suffix string) (*kapi.Pod, error) {
func makeHookPod(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, deployerPod *kapi.Pod, strategy *deployapi.DeploymentStrategy, suffix string) (*kapi.Pod, error) {
exec := hook.ExecNewPod
var baseContainer *kapi.Container
for _, container := range deployment.Spec.Template.Spec.Containers {
Expand Down Expand Up @@ -318,7 +321,7 @@ func makeHookPod(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationCont
}

// Assigning to a variable since its address is required
maxDeploymentDurationSeconds := deployapi.MaxDeploymentDurationSeconds
maxDeploymentDurationSeconds := deployapi.MaxDeploymentDurationSeconds - int64(time.Since(deployerPod.Status.StartTime.Time).Seconds())

// Let the kubelet manage retries if requested
restartPolicy := kapi.RestartPolicyNever
Expand Down Expand Up @@ -405,40 +408,20 @@ func canRetryReading(pod *kapi.Pod, restarts int32) (bool, int32) {
return pod.Spec.RestartPolicy == kapi.RestartPolicyOnFailure && restartCount > restarts, restartCount
}

// HookExecutorPodClient abstracts access to pods.
type HookExecutorPodClient interface {
CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error)
PodWatch(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod
}

// HookExecutorPodClientImpl is a pluggable HookExecutorPodClient.
type HookExecutorPodClientImpl struct {
CreatePodFunc func(namespace string, pod *kapi.Pod) (*kapi.Pod, error)
PodWatchFunc func(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod
}

func (i *HookExecutorPodClientImpl) CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
return i.CreatePodFunc(namespace, pod)
}

func (i *HookExecutorPodClientImpl) PodWatch(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
return i.PodWatchFunc(namespace, name, resourceVersion, stopChannel)
}

// NewPodWatch creates a pod watching function which is backed by a
// FIFO/reflector pair. This avoids managing watches directly.
// A stop channel to close the watch's reflector is also returned.
// It is the caller's responsibility to defer closing the stop channel to prevent leaking resources.
func NewPodWatch(client kclient.PodsNamespacer, namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
func NewPodWatch(client kclient.PodInterface, namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
fieldSelector := fields.OneTermEqualSelector("metadata.name", name)
podLW := &cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
opts := kapi.ListOptions{FieldSelector: fieldSelector}
return client.Pods(namespace).List(opts)
options.FieldSelector = fieldSelector
return client.List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
opts := kapi.ListOptions{FieldSelector: fieldSelector, ResourceVersion: options.ResourceVersion}
return client.Pods(namespace).Watch(opts)
options.FieldSelector = fieldSelector
return client.Watch(options)
},
}

Expand Down Expand Up @@ -472,12 +455,12 @@ func NewAcceptNewlyObservedReadyPods(
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
lw := &cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
opts := kapi.ListOptions{LabelSelector: selector}
return kclient.Pods(deployment.Namespace).List(opts)
options.LabelSelector = selector
return kclient.Pods(deployment.Namespace).List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
opts := kapi.ListOptions{LabelSelector: selector, ResourceVersion: options.ResourceVersion}
return kclient.Pods(deployment.Namespace).Watch(opts)
options.LabelSelector = selector
return kclient.Pods(deployment.Namespace).Watch(options)
},
}
stop := make(chan struct{})
Expand Down
Loading

0 comments on commit 30a00dc

Please sign in to comment.