Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow exponential backoff to save PVs to API server #16

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 83 additions & 37 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type ProvisionController struct {
exponentialBackOffOnError bool
threadiness int

createProvisionedPVBackoff *wait.Backoff
createProvisionedPVRetryCount int
createProvisionedPVInterval time.Duration

Expand Down Expand Up @@ -244,6 +245,9 @@ func CreateProvisionedPVRetryCount(createProvisionedPVRetryCount int) func(*Prov
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVBackoff != nil {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount")
}
c.createProvisionedPVRetryCount = createProvisionedPVRetryCount
return nil
}
Expand All @@ -256,11 +260,34 @@ func CreateProvisionedPVInterval(createProvisionedPVInterval time.Duration) func
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVBackoff != nil {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval")
}
c.createProvisionedPVInterval = createProvisionedPVInterval
return nil
}
}

// CreateProvisionedPVBackoff is the configuration of exponential backoff between retries when we create a
// PV object for a provisioned volume. Defaults to linear backoff, 10 seconds 5 times.
// Only one of CreateProvisionedPVInterval+CreateProvisionedPVRetryCount or CreateProvisionedPVBackoff
// can be used.
func CreateProvisionedPVBackoff(backoff wait.Backoff) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVRetryCount != 0 {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount")
}
if c.createProvisionedPVInterval != 0 {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval")
}
c.createProvisionedPVBackoff = &backoff
return nil
}
}

// FailedProvisionThreshold is the threshold for max number of retries on
// failures of Provision. Defaults to 15.
func FailedProvisionThreshold(failedProvisionThreshold int) func(*ProvisionController) error {
Expand Down Expand Up @@ -452,34 +479,35 @@ func NewProvisionController(
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component})

controller := &ProvisionController{
client: client,
provisionerName: provisionerName,
provisioner: provisioner,
kubeVersion: utilversion.MustParseSemantic(kubeVersion),
id: id,
component: component,
eventRecorder: eventRecorder,
resyncPeriod: DefaultResyncPeriod,
exponentialBackOffOnError: DefaultExponentialBackOffOnError,
threadiness: DefaultThreadiness,
createProvisionedPVRetryCount: DefaultCreateProvisionedPVRetryCount,
createProvisionedPVInterval: DefaultCreateProvisionedPVInterval,
failedProvisionThreshold: DefaultFailedProvisionThreshold,
failedDeleteThreshold: DefaultFailedDeleteThreshold,
leaderElection: DefaultLeaderElection,
leaderElectionNamespace: getInClusterNamespace(),
leaseDuration: DefaultLeaseDuration,
renewDeadline: DefaultRenewDeadline,
retryPeriod: DefaultRetryPeriod,
metricsPort: DefaultMetricsPort,
metricsAddress: DefaultMetricsAddress,
metricsPath: DefaultMetricsPath,
hasRun: false,
hasRunLock: &sync.Mutex{},
client: client,
provisionerName: provisionerName,
provisioner: provisioner,
kubeVersion: utilversion.MustParseSemantic(kubeVersion),
id: id,
component: component,
eventRecorder: eventRecorder,
resyncPeriod: DefaultResyncPeriod,
exponentialBackOffOnError: DefaultExponentialBackOffOnError,
threadiness: DefaultThreadiness,
failedProvisionThreshold: DefaultFailedProvisionThreshold,
failedDeleteThreshold: DefaultFailedDeleteThreshold,
leaderElection: DefaultLeaderElection,
leaderElectionNamespace: getInClusterNamespace(),
leaseDuration: DefaultLeaseDuration,
renewDeadline: DefaultRenewDeadline,
retryPeriod: DefaultRetryPeriod,
metricsPort: DefaultMetricsPort,
metricsAddress: DefaultMetricsAddress,
metricsPath: DefaultMetricsPath,
hasRun: false,
hasRunLock: &sync.Mutex{},
}

for _, option := range options {
option(controller)
err := option(controller)
if err != nil {
glog.Fatalf("Error processing controller options: %s", err)
}
}

var rateLimiter workqueue.RateLimiter
Expand All @@ -500,6 +528,22 @@ func NewProvisionController(
controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes")

if controller.createProvisionedPVBackoff == nil {
// Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default.
if controller.createProvisionedPVInterval == 0 {
controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval
}
if controller.createProvisionedPVRetryCount == 0 {
controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount
}
controller.createProvisionedPVBackoff = &wait.Backoff{
Duration: controller.createProvisionedPVInterval,
Factor: 1, // linear backoff
Steps: controller.createProvisionedPVRetryCount,
Cap: controller.createProvisionedPVInterval,
}
}

informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)

// ----------------------
Expand Down Expand Up @@ -1077,47 +1121,49 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol
}

// Try to create the PV object several times
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
var lastSaveError error
err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) {
glog.Info(logOperation(operation, "trying to save persistentvolume %q", volume.Name))
if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
glog.Info(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name))
err = nil
} else {
glog.Info(logOperation(operation, "persistentvolume %q saved", volume.Name))
}
break
return true, nil
}
// Save failed, try again after a while.
glog.Info(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err))
time.Sleep(ctrl.createProvisionedPVInterval)
}
lastSaveError = err
return false, nil
})

if err != nil {
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate PV object for it.
// Emit some event here and try to delete the storage asset several
// times.
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), lastSaveError)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", strerr)

for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
var lastDeleteError error
err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) {
if err = ctrl.provisioner.Delete(volume); err == nil {
// Delete succeeded
glog.Info(logOperation(operation, "cleaning volume %q succeeded", volume.Name))
break
return true, nil
}
// Delete failed, try again after a while.
glog.Info(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err))
time.Sleep(ctrl.createProvisionedPVInterval)
}

lastDeleteError = err
return false, nil
})
if err != nil {
// Delete failed several times. There is an orphaned volume and there
// is nothing we can do about it.
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err)
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), lastDeleteError)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
}
Expand Down