diff --git a/glide.lock b/glide.lock index 9ecb38c82..7bed47da3 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: b737760e0f81a2850433fe5b9a0ddc1d6b15d93a1207fe0a607313978062c265 -updated: 2017-07-07T10:37:48.142224896+06:00 +updated: 2017-07-07T12:14:25.253279508+06:00 imports: - name: cloud.google.com/go version: fe3d41e1ecb2ce36ad3a979037c9b9a2b726226f @@ -15,7 +15,6 @@ imports: - runtime - types - version - - wait - name: github.com/appscode/log version: 68e0e42e42c01371f3950571fc87c2298a51dd8f - name: github.com/appscode/osm @@ -218,9 +217,7 @@ imports: - api/install - client/clientset - pkg/analytics - - pkg/controller - pkg/docker - - pkg/eventer - pkg/storage - pkg/validator - name: github.com/k8sdb/elasticsearch @@ -243,8 +240,6 @@ imports: version: ad45545899c7b13c020ea92b2072220eefad42b8 - name: github.com/ncw/swift version: 9e6fdb8957a022d5780a78b58d6181c3580bb01f -- name: github.com/orcaman/concurrent-map - version: 2ae17bc4c860c83513ee50feb9746f3e50d7515d - name: github.com/pborman/uuid version: ca53cad383cad2479bbba7f7a1a05797ec1386e4 - name: github.com/pkg/errors @@ -345,8 +340,6 @@ imports: - urlfetch - name: gopkg.in/inf.v0 version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 -- name: gopkg.in/robfig/cron.v2 - version: be2e0b0deed5a68ffee390b4583a13aff8321535 - name: gopkg.in/yaml.v2 version: cd8b52f8269e0feb286dfeef29f8fe4d5b397e0b - name: k8s.io/apimachinery diff --git a/pkg/describer/k8sdb_describer.go b/pkg/describer/k8sdb_describer.go index c8f0d17d4..87f34e1b7 100644 --- a/pkg/describer/k8sdb_describer.go +++ b/pkg/describer/k8sdb_describer.go @@ -321,13 +321,13 @@ func listSnapshots(snapshotList *tapi.SnapshotList, out io.Writer) { fmt.Fprint(w, " Name\tBucket\tStartTime\tCompletionTime\tPhase\n") fmt.Fprint(w, " ----\t------\t---------\t--------------\t-----\n") for _, e := range snapshotList.Items { - container, err := e.Spec.SnapshotStorageSpec.Location() + location, err := e.Spec.SnapshotStorageSpec.Location() if err != nil { - container = statusUnknown + location = "" } fmt.Fprintf(w, " %s\t%s\t%s\t%s\t%s\n", e.Name, - container, + location, timeToString(e.Status.StartTime), timeToString(e.Status.CompletionTime), e.Status.Phase, diff --git a/pkg/printer/resource_printer.go b/pkg/printer/resource_printer.go index 768214fd9..e3ac9ccbf 100644 --- a/pkg/printer/resource_printer.go +++ b/pkg/printer/resource_printer.go @@ -270,11 +270,11 @@ func (h *HumanReadablePrinter) printSnapshot(item *tapi.Snapshot, w io.Writer, o } if options.Wide { - container, err := item.Spec.SnapshotStorageSpec.Location() + loc, err := item.Spec.SnapshotStorageSpec.Location() if err != nil { - container = statusUnknown + loc = statusUnknown } - if _, err := fmt.Fprintf(w, "%s\t", container); err != nil { + if _, err := fmt.Fprintf(w, "%s\t", loc); err != nil { return err } } diff --git a/vendor/github.com/appscode/go/wait/doc.go b/vendor/github.com/appscode/go/wait/doc.go deleted file mode 100644 index ff89dc170..000000000 --- a/vendor/github.com/appscode/go/wait/doc.go +++ /dev/null @@ -1,19 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package wait provides tools for polling or listening for changes -// to a condition. -package wait diff --git a/vendor/github.com/appscode/go/wait/wait.go b/vendor/github.com/appscode/go/wait/wait.go deleted file mode 100644 index 23de4c7b7..000000000 --- a/vendor/github.com/appscode/go/wait/wait.go +++ /dev/null @@ -1,332 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package wait - -import ( - "errors" - "math/rand" - "time" - - "github.com/appscode/go/runtime" -) - -// For any test of the style: -// ... -// <- time.After(timeout): -// t.Errorf("Timed out") -// The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s -// is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine -// (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test. -var ForeverTestTimeout = time.Second * 30 - -// NeverStop may be passed to Until to make it never stop. -var NeverStop <-chan struct{} = make(chan struct{}) - -// Forever calls f every period for ever. -// -// Forever is syntactic sugar on top of Until. -func Forever(f func(), period time.Duration) { - Until(f, period, NeverStop) -} - -// Until loops until stop channel is closed, running f every period. -// -// Until is syntactic sugar on top of JitterUntil with zero jitter factor and -// with sliding = true (which means the timer for period starts after the f -// completes). -func Until(f func(), period time.Duration, stopCh <-chan struct{}) { - JitterUntil(f, period, 0.0, true, stopCh) -} - -// NonSlidingUntil loops until stop channel is closed, running f every -// period. -// -// NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter -// factor, with sliding = false (meaning the timer for period starts at the same -// time as the function starts). -func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) { - JitterUntil(f, period, 0.0, false, stopCh) -} - -// JitterUntil loops until stop channel is closed, running f every period. -// -// If jitterFactor is positive, the period is jittered before every run of f. -// If jitterFactor is not positive, the period is unchanged and not jitterd. -// -// If slidingis true, the period is computed after f runs. If it is false then -// period includes the runtime for f. -// -// Close stopCh to stop. f may not be invoked if stop channel is already -// closed. Pass NeverStop to if you don't want it stop. -func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) { - for { - - select { - case <-stopCh: - return - default: - } - - jitteredPeriod := period - if jitterFactor > 0.0 { - jitteredPeriod = Jitter(period, jitterFactor) - } - - var t *time.Timer - if !sliding { - t = time.NewTimer(jitteredPeriod) - } - - func() { - defer runtime.HandleCrash() - f() - }() - - if sliding { - t = time.NewTimer(jitteredPeriod) - } - - // NOTE: b/c there is no priority selection in golang - // it is possible for this to race, meaning we could - // trigger t.C and stopCh, and t.C select falls through. - // In order to mitigate we re-check stopCh at the beginning - // of every loop to prevent extra executions of f(). - select { - case <-stopCh: - return - case <-t.C: - } - } -} - -// Jitter returns a time.Duration between duration and duration + maxFactor * -// duration. -// -// This allows clients to avoid converging on periodic behavior. If maxFactor -// is 0.0, a suggested default value will be chosen. -func Jitter(duration time.Duration, maxFactor float64) time.Duration { - if maxFactor <= 0.0 { - maxFactor = 1.0 - } - wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration)) - return wait -} - -// ErrWaitTimeout is returned when the condition exited without success. -var ErrWaitTimeout = errors.New("timed out waiting for the condition") - -// ConditionFunc returns true if the condition is satisfied, or an error -// if the loop should be aborted. -type ConditionFunc func() (done bool, err error) - -// Backoff holds parameters applied to a Backoff function. -type Backoff struct { - Duration time.Duration // the base duration - Factor float64 // Duration is multiplied by factor each iteration - Jitter float64 // The amount of jitter applied each iteration - Steps int // Exit with error after this many steps -} - -// ExponentialBackoff repeats a condition check with exponential backoff. -// -// It checks the condition up to Steps times, increasing the wait by multiplyingEnsureD -// the previous duration by Factor. -// -// If Jitter is greater than zero, a random amount of each duration is added -// (between duration and duration*(1+jitter)). -// -// If the condition never returns true, ErrWaitTimeout is returned. All other -// errors terminate immediately. -func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { - duration := backoff.Duration - for i := 0; i < backoff.Steps; i++ { - if i != 0 { - adjusted := duration - if backoff.Jitter > 0.0 { - adjusted = Jitter(duration, backoff.Jitter) - } - time.Sleep(adjusted) - duration = time.Duration(float64(duration) * backoff.Factor) - } - if ok, err := condition(); err != nil || ok { - return err - } - } - return ErrWaitTimeout -} - -// Poll tries a condition func until it returns true, an error, or the timeout -// is reached. -// -// Poll always waits the interval before the run of 'condition'. -// 'condition' will always be invoked at least once. -// -// Some intervals may be missed if the condition takes too long or the time -// window is too short. -// -// If you want to Poll something forever, see PollInfinite. -func Poll(interval, timeout time.Duration, condition ConditionFunc) error { - return pollInternal(poller(interval, timeout), condition) -} - -func pollInternal(wait WaitFunc, condition ConditionFunc) error { - return WaitFor(wait, condition, NeverStop) -} - -// PollImmediate tries a condition func until it returns true, an error, or the timeout -// is reached. -// -// Poll always checks 'condition' before waiting for the interval. 'condition' -// will always be invoked at least once. -// -// Some intervals may be missed if the condition takes too long or the time -// window is too short. -// -// If you want to Poll something forever, see PollInfinite. -func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error { - return pollImmediateInternal(poller(interval, timeout), condition) -} - -func pollImmediateInternal(wait WaitFunc, condition ConditionFunc) error { - done, err := condition() - if err != nil { - return err - } - if done { - return nil - } - return pollInternal(wait, condition) -} - -// PollInfinite tries a condition func until it returns true or an error -// -// PollInfinite always waits the interval before the run of 'condition'. -// -// Some intervals may be missed if the condition takes too long or the time -// window is too short. -func PollInfinite(interval time.Duration, condition ConditionFunc) error { - done := make(chan struct{}) - defer close(done) - return PollUntil(interval, condition, done) -} - -// PollImmediateInfinite tries a condition func until it returns true or an error -// -// PollImmediateInfinite runs the 'condition' before waiting for the interval. -// -// Some intervals may be missed if the condition takes too long or the time -// window is too short. -func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error { - done, err := condition() - if err != nil { - return err - } - if done { - return nil - } - return PollInfinite(interval, condition) -} - -// PollUntil tries a condition func until it returns true, an error or stopCh is -// closed. -// -// PolUntil always waits interval before the first run of 'condition'. -// 'condition' will always be invoked at least once. -func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error { - return WaitFor(poller(interval, 0), condition, stopCh) -} - -// WaitFunc creates a channel that receives an item every time a test -// should be executed and is closed when the last test should be invoked. -type WaitFunc func(done <-chan struct{}) <-chan struct{} - -// WaitFor continually checks 'fn' as driven by 'wait'. -// -// WaitFor gets a channel from 'wait()'', and then invokes 'fn' once for every value -// placed on the channel and once more when the channel is closed. -// -// If 'fn' returns an error the loop ends and that error is returned, and if -// 'fn' returns true the loop ends and nil is returned. -// -// ErrWaitTimeout will be returned if the channel is closed without fn ever -// returning true. -func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error { - c := wait(done) - for { - _, open := <-c - ok, err := fn() - if err != nil { - return err - } - if ok { - return nil - } - if !open { - break - } - } - return ErrWaitTimeout -} - -// poller returns a WaitFunc that will send to the channel every interval until -// timeout has elapsed and then closes the channel. -// -// Over very short intervals you may receive no ticks before the channel is -// closed. A timeout of 0 is interpreted as an infinity. -// -// Output ticks are not buffered. If the channel is not ready to receive an -// item, the tick is skipped. -func poller(interval, timeout time.Duration) WaitFunc { - return WaitFunc(func(done <-chan struct{}) <-chan struct{} { - ch := make(chan struct{}) - - go func() { - defer close(ch) - - tick := time.NewTicker(interval) - defer tick.Stop() - - var after <-chan time.Time - if timeout != 0 { - // time.After is more convenient, but it - // potentially leaves timers around much longer - // than necessary if we exit early. - timer := time.NewTimer(timeout) - after = timer.C - defer timer.Stop() - } - - for { - select { - case <-tick.C: - // If the consumer isn't ready for this signal drop it and - // check the other channels. - select { - case ch <- struct{}{}: - default: - } - case <-after: - return - case <-done: - return - } - } - }() - - return ch - }) -} diff --git a/vendor/github.com/k8sdb/apimachinery/pkg/controller/controller.go b/vendor/github.com/k8sdb/apimachinery/pkg/controller/controller.go deleted file mode 100644 index d949f1dc6..000000000 --- a/vendor/github.com/k8sdb/apimachinery/pkg/controller/controller.go +++ /dev/null @@ -1,19 +0,0 @@ -package controller - -import ( - "time" - - tcs "github.com/k8sdb/apimachinery/client/clientset" - clientset "k8s.io/client-go/kubernetes" -) - -type Controller struct { - // Kubernetes client - Client clientset.Interface - // ThirdPartyExtension client - ExtClient tcs.ExtensionInterface -} - -const ( - sleepDuration = time.Second * 10 -) diff --git a/vendor/github.com/k8sdb/apimachinery/pkg/controller/cron.go b/vendor/github.com/k8sdb/apimachinery/pkg/controller/cron.go deleted file mode 100644 index b82394f91..000000000 --- a/vendor/github.com/k8sdb/apimachinery/pkg/controller/cron.go +++ /dev/null @@ -1,246 +0,0 @@ -package controller - -import ( - "errors" - "fmt" - "sync" - "time" - - "github.com/appscode/log" - tapi "github.com/k8sdb/apimachinery/api" - tcs "github.com/k8sdb/apimachinery/client/clientset" - "github.com/k8sdb/apimachinery/pkg/eventer" - cmap "github.com/orcaman/concurrent-map" - "gopkg.in/robfig/cron.v2" - kerr "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - clientset "k8s.io/client-go/kubernetes" - apiv1 "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/record" -) - -type CronControllerInterface interface { - StartCron() - ScheduleBackup(runtime.Object, metav1.ObjectMeta, *tapi.BackupScheduleSpec) error - StopBackupScheduling(metav1.ObjectMeta) - StopCron() -} - -type cronController struct { - // ThirdPartyExtension client - extClient tcs.ExtensionInterface - // For Internal Cron Job - cron *cron.Cron - // Store Cron Job EntryID for further use - cronEntryIDs cmap.ConcurrentMap - // Event Recorder - eventRecorder record.EventRecorder - // To perform start operation once - once sync.Once -} - -/* - NewCronController returns CronControllerInterface. - Need to call StartCron() method to start Cron. -*/ -func NewCronController(client clientset.Interface, extClient tcs.ExtensionInterface) CronControllerInterface { - return &cronController{ - extClient: extClient, - cron: cron.New(), - cronEntryIDs: cmap.New(), - eventRecorder: eventer.NewEventRecorder(client, "Cron Controller"), - } -} - -func (c *cronController) StartCron() { - c.once.Do(func() { - c.cron.Start() - }) -} - -func (c *cronController) ScheduleBackup( - // Runtime Object to push event - runtimeObj runtime.Object, - // ObjectMeta of Database TPR object - om metav1.ObjectMeta, - // BackupScheduleSpec - spec *tapi.BackupScheduleSpec, -) error { - // cronEntry name - cronEntryName := fmt.Sprintf("%v@%v", om.Name, om.Namespace) - - // Remove previous cron job if exist - if id, exists := c.cronEntryIDs.Pop(cronEntryName); exists { - c.cron.Remove(id.(cron.EntryID)) - } - - invoker := &snapshotInvoker{ - extClient: c.extClient, - runtimeObject: runtimeObj, - om: om, - spec: spec, - eventRecorder: c.eventRecorder, - } - - if err := invoker.validateScheduler(durationCheckSnapshotJob); err != nil { - return err - } - - // Set cron job - entryID, err := c.cron.AddFunc(spec.CronExpression, invoker.createScheduledSnapshot) - if err != nil { - return err - } - - // Add job entryID - c.cronEntryIDs.Set(cronEntryName, entryID) - return nil -} - -func (c *cronController) StopBackupScheduling(om metav1.ObjectMeta) { - // cronEntry name - cronEntryName := fmt.Sprintf("%v@%v", om.Name, om.Namespace) - - if id, exists := c.cronEntryIDs.Pop(cronEntryName); exists { - c.cron.Remove(id.(cron.EntryID)) - } -} - -func (c *cronController) StopCron() { - c.cron.Stop() -} - -type snapshotInvoker struct { - extClient tcs.ExtensionInterface - runtimeObject runtime.Object - om metav1.ObjectMeta - spec *tapi.BackupScheduleSpec - eventRecorder record.EventRecorder -} - -func (s *snapshotInvoker) validateScheduler(checkDuration time.Duration) error { - utc := time.Now().UTC() - snapshotName := fmt.Sprintf("%v-%v", s.om.Name, utc.Format("20060102-150405")) - if err := s.createSnapshot(snapshotName); err != nil { - return err - } - - var snapshotSuccess bool = false - - then := time.Now() - now := time.Now() - for now.Sub(then) < checkDuration { - snapshot, err := s.extClient.Snapshots(s.om.Namespace).Get(snapshotName) - if err != nil { - if kerr.IsNotFound(err) { - time.Sleep(sleepDuration) - now = time.Now() - continue - } else { - return err - } - } - - if snapshot.Status.Phase == tapi.SnapshotPhaseSuccessed { - snapshotSuccess = true - break - } - if snapshot.Status.Phase == tapi.SnapshotPhaseFailed { - break - } - - time.Sleep(sleepDuration) - now = time.Now() - } - - if !snapshotSuccess { - return errors.New("Failed to complete initial snapshot") - } - - return nil -} - -func (s *snapshotInvoker) createScheduledSnapshot() { - kind := s.runtimeObject.GetObjectKind().GroupVersionKind().Kind - name := s.om.Name - - labelMap := map[string]string{ - tapi.LabelDatabaseKind: kind, - tapi.LabelDatabaseName: name, - tapi.LabelSnapshotStatus: string(tapi.SnapshotPhaseRunning), - } - - snapshotList, err := s.extClient.Snapshots(s.om.Namespace).List(metav1.ListOptions{ - LabelSelector: labels.Set(labelMap).AsSelector().String(), - }) - if err != nil { - s.eventRecorder.Eventf( - s.runtimeObject, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToList, - "Failed to list Snapshots. Reason: %v", - err, - ) - log.Errorln(err) - return - } - - if len(snapshotList.Items) > 0 { - s.eventRecorder.Event( - s.runtimeObject, - apiv1.EventTypeNormal, - eventer.EventReasonIgnoredSnapshot, - "Skipping scheduled Backup. One is still active.", - ) - log.Debugln("Skipping scheduled Backup. One is still active.") - return - } - - // Set label. Elastic controller will detect this using label selector - labelMap = map[string]string{ - tapi.LabelDatabaseKind: kind, - tapi.LabelDatabaseName: name, - } - - now := time.Now().UTC() - snapshotName := fmt.Sprintf("%v-%v", s.om.Name, now.Format("20060102-150405")) - - if err = s.createSnapshot(snapshotName); err != nil { - log.Errorln(err) - } -} - -func (s *snapshotInvoker) createSnapshot(snapshotName string) error { - labelMap := map[string]string{ - tapi.LabelDatabaseKind: s.runtimeObject.GetObjectKind().GroupVersionKind().Kind, - tapi.LabelDatabaseName: s.om.Name, - } - - snapshot := &tapi.Snapshot{ - ObjectMeta: metav1.ObjectMeta{ - Name: snapshotName, - Namespace: s.om.Namespace, - Labels: labelMap, - }, - Spec: tapi.SnapshotSpec{ - DatabaseName: s.om.Name, - SnapshotStorageSpec: s.spec.SnapshotStorageSpec, - Resources: s.spec.Resources, - }, - } - - if _, err := s.extClient.Snapshots(snapshot.Namespace).Create(snapshot); err != nil { - s.eventRecorder.Eventf( - s.runtimeObject, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToCreate, - "Failed to create Snapshot. Reason: %v", - err, - ) - return err - } - - return nil -} diff --git a/vendor/github.com/k8sdb/apimachinery/pkg/controller/dormant_database.go b/vendor/github.com/k8sdb/apimachinery/pkg/controller/dormant_database.go deleted file mode 100644 index e3bc4e6c2..000000000 --- a/vendor/github.com/k8sdb/apimachinery/pkg/controller/dormant_database.go +++ /dev/null @@ -1,540 +0,0 @@ -package controller - -import ( - "errors" - "reflect" - "time" - - "github.com/appscode/go/wait" - "github.com/appscode/log" - tapi "github.com/k8sdb/apimachinery/api" - tcs "github.com/k8sdb/apimachinery/client/clientset" - "github.com/k8sdb/apimachinery/pkg/analytics" - "github.com/k8sdb/apimachinery/pkg/eventer" - kerr "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - clientset "k8s.io/client-go/kubernetes" - apiv1 "k8s.io/client-go/pkg/api/v1" - extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" -) - -type Deleter interface { - // Check Database TPR - Exists(*metav1.ObjectMeta) (bool, error) - // Pause operation - PauseDatabase(*tapi.DormantDatabase) error - // Wipe out operation - WipeOutDatabase(*tapi.DormantDatabase) error - // Resume operation - ResumeDatabase(*tapi.DormantDatabase) error -} - -type DormantDbController struct { - // Kubernetes client - client clientset.Interface - // ThirdPartyExtension client - extClient tcs.ExtensionInterface - // Deleter interface - deleter Deleter - // ListerWatcher - lw *cache.ListWatch - // Event Recorder - eventRecorder record.EventRecorder - // sync time to sync the list. - syncPeriod time.Duration -} - -// NewDormantDbController creates a new DormantDatabase Controller -func NewDormantDbController( - client clientset.Interface, - extClient tcs.ExtensionInterface, - deleter Deleter, - lw *cache.ListWatch, - syncPeriod time.Duration, -) *DormantDbController { - // return new DormantDatabase Controller - return &DormantDbController{ - client: client, - extClient: extClient, - deleter: deleter, - lw: lw, - eventRecorder: eventer.NewEventRecorder(client, "DormantDatabase Controller"), - syncPeriod: syncPeriod, - } -} - -func (c *DormantDbController) Run() { - // Ensure DormantDatabase TPR - c.ensureThirdPartyResource() - // Watch DormantDatabase with provided ListerWatcher - c.watch() -} - -// Ensure DormantDatabase ThirdPartyResource -func (c *DormantDbController) ensureThirdPartyResource() { - log.Infoln("Ensuring DormantDatabase ThirdPartyResource") - - resourceName := tapi.ResourceNameDormantDatabase + "." + tapi.V1alpha1SchemeGroupVersion.Group - var err error - if _, err = c.client.ExtensionsV1beta1().ThirdPartyResources().Get(resourceName, metav1.GetOptions{}); err == nil { - return - } - if !kerr.IsNotFound(err) { - log.Fatalln(err) - } - - thirdPartyResource := &extensions.ThirdPartyResource{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "extensions/v1beta1", - Kind: "ThirdPartyResource", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: resourceName, - Labels: map[string]string{ - "app": "kubedb", - }, - }, - Description: "Dormant KubeDB databases", - Versions: []extensions.APIVersion{ - { - Name: tapi.V1alpha1SchemeGroupVersion.Version, - }, - }, - } - if _, err := c.client.ExtensionsV1beta1().ThirdPartyResources().Create(thirdPartyResource); err != nil { - log.Fatalln(err) - } -} - -func (c *DormantDbController) watch() { - _, cacheController := cache.NewInformer(c.lw, - &tapi.DormantDatabase{}, - c.syncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - dormantDb := obj.(*tapi.DormantDatabase) - if dormantDb.Status.CreationTime == nil { - if err := c.create(dormantDb); err != nil { - dormantDbFailedToCreate() - log.Errorln(err) - } else { - dormantDbSuccessfullyCreated() - } - } - }, - DeleteFunc: func(obj interface{}) { - if err := c.delete(obj.(*tapi.DormantDatabase)); err != nil { - dormantDbFailedToDelete() - log.Errorln(err) - } else { - dormantDbSuccessfullyDeleted() - } - }, - UpdateFunc: func(old, new interface{}) { - oldDormantDb, ok := old.(*tapi.DormantDatabase) - if !ok { - return - } - newDormantDb, ok := new.(*tapi.DormantDatabase) - if !ok { - return - } - // TODO: Find appropriate checking - // Only allow if Spec varies - if !reflect.DeepEqual(oldDormantDb.Spec, newDormantDb.Spec) { - if err := c.update(oldDormantDb, newDormantDb); err != nil { - log.Errorln(err) - } - } - }, - }, - ) - cacheController.Run(wait.NeverStop) -} - -func (c *DormantDbController) create(dormantDb *tapi.DormantDatabase) error { - - var err error - if dormantDb, err = c.extClient.DormantDatabases(dormantDb.Namespace).Get(dormantDb.Name); err != nil { - return err - } - - // Set DormantDatabase Phase: Deleting - t := metav1.Now() - dormantDb.Status.CreationTime = &t - if _, err := c.extClient.DormantDatabases(dormantDb.Namespace).Update(dormantDb); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToUpdate, - "Failed to update DormantDatabase. Reason: %v", - err, - ) - return err - } - - // Check if DB TPR object exists - found, err := c.deleter.Exists(&dormantDb.ObjectMeta) - if err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToPause, - "Failed to pause Database. Reason: %v", - err, - ) - return err - } - - if found { - message := "Failed to pause Database. Delete Database TPR object first" - c.eventRecorder.Event( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToPause, - message, - ) - - // Delete DormantDatabase object - if err := c.extClient.DormantDatabases(dormantDb.Namespace).Delete(dormantDb.Name); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToDelete, - "Failed to delete DormantDatabase. Reason: %v", - err, - ) - log.Errorln(err) - } - return errors.New(message) - } - - if dormantDb, err = c.extClient.DormantDatabases(dormantDb.Namespace).Get(dormantDb.Name); err != nil { - return err - } - - // Set DormantDatabase Phase: Deleting - t = metav1.Now() - dormantDb.Status.Phase = tapi.DormantDatabasePhasePausing - if _, err = c.extClient.DormantDatabases(dormantDb.Namespace).Update(dormantDb); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToUpdate, - "Failed to update DormantDatabase. Reason: %v", - err, - ) - return err - } - - c.eventRecorder.Event(dormantDb, apiv1.EventTypeNormal, eventer.EventReasonPausing, "Pausing Database") - - // Pause Database workload - if err := c.deleter.PauseDatabase(dormantDb); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToDelete, - "Failed to pause. Reason: %v", - err, - ) - return err - } - - c.eventRecorder.Event( - dormantDb, - apiv1.EventTypeNormal, - eventer.EventReasonSuccessfulPause, - "Successfully paused Database workload", - ) - - if dormantDb, err = c.extClient.DormantDatabases(dormantDb.Namespace).Get(dormantDb.Name); err != nil { - return err - } - - // Set DormantDatabase Phase: Paused - t = metav1.Now() - dormantDb.Status.PausingTime = &t - dormantDb.Status.Phase = tapi.DormantDatabasePhasePaused - if _, err = c.extClient.DormantDatabases(dormantDb.Namespace).Update(dormantDb); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToUpdate, - "Failed to update DormantDatabase. Reason: %v", - err, - ) - return err - } - - return nil -} - -func (c *DormantDbController) delete(dormantDb *tapi.DormantDatabase) error { - phase := dormantDb.Status.Phase - if phase != tapi.DormantDatabasePhaseResuming && phase != tapi.DormantDatabasePhaseWipedOut { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToDelete, - `DormantDatabase "%v" is not %v.`, - dormantDb.Name, - tapi.DormantDatabasePhaseWipedOut, - ) - - if err := c.reCreateDormantDatabase(dormantDb); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToCreate, - `Failed to recreate DormantDatabase: "%v". Reason: %v`, - dormantDb.Name, - err, - ) - return err - } - } - return nil -} - -func (c *DormantDbController) update(oldDormantDb, updatedDormantDb *tapi.DormantDatabase) error { - if oldDormantDb.Spec.WipeOut != updatedDormantDb.Spec.WipeOut && updatedDormantDb.Spec.WipeOut { - return c.wipeOut(updatedDormantDb) - } - - if oldDormantDb.Spec.Resume != updatedDormantDb.Spec.Resume && updatedDormantDb.Spec.Resume { - if oldDormantDb.Status.Phase == tapi.DormantDatabasePhasePaused { - return c.resume(updatedDormantDb) - } else { - message := "Failed to resume Database. " + - "Only DormantDatabase of \"Paused\" Phase can be resumed" - c.eventRecorder.Event( - updatedDormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToUpdate, - message, - ) - } - } - return nil -} - -func (c *DormantDbController) wipeOut(dormantDb *tapi.DormantDatabase) error { - // Check if DB TPR object exists - found, err := c.deleter.Exists(&dormantDb.ObjectMeta) - if err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToDelete, - "Failed to wipeOut Database. Reason: %v", - err, - ) - return err - } - - if found { - message := "Failed to wipeOut Database. Delete Database TPR object first" - c.eventRecorder.Event( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToWipeOut, - message, - ) - - // Delete DormantDatabase object - if err := c.extClient.DormantDatabases(dormantDb.Namespace).Delete(dormantDb.Name); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToDelete, - "Failed to delete DormantDatabase. Reason: %v", - err, - ) - log.Errorln(err) - } - return errors.New(message) - } - - if dormantDb, err = c.extClient.DormantDatabases(dormantDb.Namespace).Get(dormantDb.Name); err != nil { - return err - } - - // Set DormantDatabase Phase: Wiping out - t := metav1.Now() - dormantDb.Status.Phase = tapi.DormantDatabasePhaseWipingOut - - if _, err := c.extClient.DormantDatabases(dormantDb.Namespace).Update(dormantDb); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToUpdate, - "Failed to update DormantDatabase. Reason: %v", - err, - ) - return err - } - - // Wipe out Database workload - c.eventRecorder.Event(dormantDb, apiv1.EventTypeNormal, eventer.EventReasonWipingOut, "Wiping out Database") - if err := c.deleter.WipeOutDatabase(dormantDb); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToWipeOut, - "Failed to wipeOut. Reason: %v", - err, - ) - return err - } - - c.eventRecorder.Event( - dormantDb, - apiv1.EventTypeNormal, - eventer.EventReasonSuccessfulWipeOut, - "Successfully wiped out Database workload", - ) - - if dormantDb, err = c.extClient.DormantDatabases(dormantDb.Namespace).Get(dormantDb.Name); err != nil { - return err - } - - // Set DormantDatabase Phase: Deleted - t = metav1.Now() - dormantDb.Status.WipeOutTime = &t - dormantDb.Status.Phase = tapi.DormantDatabasePhaseWipedOut - if _, err = c.extClient.DormantDatabases(dormantDb.Namespace).Update(dormantDb); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToUpdate, - "Failed to update DormantDatabase. Reason: %v", - err, - ) - return err - } - - return nil -} - -func (c *DormantDbController) resume(dormantDb *tapi.DormantDatabase) error { - c.eventRecorder.Event( - dormantDb, - apiv1.EventTypeNormal, - eventer.EventReasonResuming, - "Resuming DormantDatabase", - ) - - // Check if DB TPR object exists - found, err := c.deleter.Exists(&dormantDb.ObjectMeta) - if err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToResume, - "Failed to resume DormantDatabase. Reason: %v", - err, - ) - return err - } - - if found { - message := "Failed to resume DormantDatabase. One Database TPR object exists with same name" - c.eventRecorder.Event( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToResume, - message, - ) - return errors.New(message) - } - - if dormantDb, err = c.extClient.DormantDatabases(dormantDb.Namespace).Get(dormantDb.Name); err != nil { - return err - } - - _dormantDb := dormantDb - _dormantDb.Status.Phase = tapi.DormantDatabasePhaseResuming - if _, err = c.extClient.DormantDatabases(_dormantDb.Namespace).Update(_dormantDb); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToUpdate, - "Failed to update DormantDatabase. Reason: %v", - err, - ) - return err - } - - if err = c.extClient.DormantDatabases(dormantDb.Namespace).Delete(dormantDb.Name); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToDelete, - "Failed to delete DormantDatabase. Reason: %v", - err, - ) - return err - } - - if err = c.deleter.ResumeDatabase(dormantDb); err != nil { - if err := c.reCreateDormantDatabase(dormantDb); err != nil { - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToCreate, - `Failed to recreate DormantDatabase: "%v". Reason: %v`, - dormantDb.Name, - err, - ) - return err - } - - c.eventRecorder.Eventf( - dormantDb, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToResume, - "Failed to resume Database. Reason: %v", - err, - ) - return err - } - return nil -} - -func (c *DormantDbController) reCreateDormantDatabase(dormantDb *tapi.DormantDatabase) error { - _dormantDb := &tapi.DormantDatabase{ - ObjectMeta: metav1.ObjectMeta{ - Name: dormantDb.Name, - Namespace: dormantDb.Namespace, - Labels: dormantDb.Labels, - Annotations: dormantDb.Annotations, - }, - Spec: dormantDb.Spec, - Status: dormantDb.Status, - } - - if _, err := c.extClient.DormantDatabases(_dormantDb.Namespace).Create(_dormantDb); err != nil { - return err - } - - return nil -} - -func dormantDbSuccessfullyCreated() { - analytics.SendEvent(tapi.ResourceNameDormantDatabase, "created", "success") -} - -func dormantDbFailedToCreate() { - analytics.SendEvent(tapi.ResourceNameDormantDatabase, "created", "failure") -} - -func dormantDbSuccessfullyDeleted() { - analytics.SendEvent(tapi.ResourceNameDormantDatabase, "deleted", "success") -} - -func dormantDbFailedToDelete() { - analytics.SendEvent(tapi.ResourceNameDormantDatabase, "deleted", "failure") -} diff --git a/vendor/github.com/k8sdb/apimachinery/pkg/controller/lib.go b/vendor/github.com/k8sdb/apimachinery/pkg/controller/lib.go deleted file mode 100644 index f120343c5..000000000 --- a/vendor/github.com/k8sdb/apimachinery/pkg/controller/lib.go +++ /dev/null @@ -1,351 +0,0 @@ -package controller - -import ( - "errors" - "fmt" - "time" - - "github.com/appscode/log" - "github.com/graymeta/stow" - _ "github.com/graymeta/stow/azure" - _ "github.com/graymeta/stow/google" - _ "github.com/graymeta/stow/s3" - tapi "github.com/k8sdb/apimachinery/api" - "github.com/k8sdb/apimachinery/pkg/eventer" - "github.com/k8sdb/apimachinery/pkg/storage" - kerr "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - apiv1 "k8s.io/client-go/pkg/api/v1" - apps "k8s.io/client-go/pkg/apis/apps/v1beta1" - batch "k8s.io/client-go/pkg/apis/batch/v1" - "k8s.io/client-go/tools/record" -) - -func (c *Controller) CheckStatefulSetPodStatus(statefulSet *apps.StatefulSet, checkDuration time.Duration) error { - podName := fmt.Sprintf("%v-%v", statefulSet.Name, 0) - - podReady := false - then := time.Now() - now := time.Now() - for now.Sub(then) < checkDuration { - pod, err := c.Client.CoreV1().Pods(statefulSet.Namespace).Get(podName, metav1.GetOptions{}) - if err != nil { - if kerr.IsNotFound(err) { - _, err := c.Client.AppsV1beta1().StatefulSets(statefulSet.Namespace).Get(statefulSet.Name, metav1.GetOptions{}) - if kerr.IsNotFound(err) { - break - } - - time.Sleep(sleepDuration) - now = time.Now() - continue - } else { - return err - } - } - log.Debugf("Pod Phase: %v", pod.Status.Phase) - - // If job is success - if pod.Status.Phase == apiv1.PodRunning { - podReady = true - break - } - - time.Sleep(sleepDuration) - now = time.Now() - } - if !podReady { - return errors.New("Database fails to be Ready") - } - return nil -} - -func (c *Controller) DeletePersistentVolumeClaims(namespace string, selector labels.Selector) error { - pvcList, err := c.Client.CoreV1().PersistentVolumeClaims(namespace).List( - metav1.ListOptions{ - LabelSelector: selector.String(), - }, - ) - if err != nil { - return err - } - - for _, pvc := range pvcList.Items { - if err := c.Client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, nil); err != nil { - return err - } - } - return nil -} - -func (c *Controller) DeleteSnapshotData(snapshot *tapi.Snapshot) error { - cfg, err := storage.NewOSMContext(c.Client, snapshot.Spec.SnapshotStorageSpec, snapshot.Namespace) - if err != nil { - return err - } - - loc, err := stow.Dial(cfg.Provider, cfg.Config) - if err != nil { - return err - } - bucket, err := snapshot.Spec.SnapshotStorageSpec.Container() - if err != nil { - return err - } - container, err := loc.Container(bucket) - if err != nil { - return err - } - - prefix, _ := snapshot.Location() // error checked by .Container() - cursor := stow.CursorStart - for { - items, next, err := container.Items(prefix, cursor, 50) - if err != nil { - return err - } - for _, item := range items { - if err := container.RemoveItem(item.ID()); err != nil { - return err - } - } - cursor = next - if stow.IsCursorEnd(cursor) { - break - } - } - - return nil -} - -func (c *Controller) DeleteSnapshots(namespace string, selector labels.Selector) error { - snapshotList, err := c.ExtClient.Snapshots(namespace).List( - metav1.ListOptions{ - LabelSelector: selector.String(), - }, - ) - if err != nil { - return err - } - - for _, snapshot := range snapshotList.Items { - if err := c.ExtClient.Snapshots(snapshot.Namespace).Delete(snapshot.Name); err != nil { - return err - } - } - return nil -} - -func (c *Controller) CheckDatabaseRestoreJob( - job *batch.Job, - runtimeObj runtime.Object, - recorder record.EventRecorder, - checkDuration time.Duration, -) bool { - var jobSuccess bool = false - var err error - - then := time.Now() - now := time.Now() - for now.Sub(then) < checkDuration { - log.Debugln("Checking for Job ", job.Name) - job, err = c.Client.BatchV1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) - if err != nil { - if kerr.IsNotFound(err) { - time.Sleep(sleepDuration) - now = time.Now() - continue - } - recorder.Eventf( - runtimeObj, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToList, - "Failed to get Job. Reason: %v", - err, - ) - log.Errorln(err) - return jobSuccess - } - log.Debugf("Pods Statuses: %d Running / %d Succeeded / %d Failed", - job.Status.Active, job.Status.Succeeded, job.Status.Failed) - // If job is success - if job.Status.Succeeded > 0 { - jobSuccess = true - break - } else if job.Status.Failed > 0 { - break - } - - time.Sleep(sleepDuration) - now = time.Now() - } - - if err != nil { - return false - } - - r, err := metav1.LabelSelectorAsSelector(job.Spec.Selector) - if err != nil { - return false - } - err = c.Client.CoreV1().Pods(job.Namespace).DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{ - LabelSelector: r.String(), - }) - if err != nil { - recorder.Eventf( - runtimeObj, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToList, - "Failed to list Pods. Reason: %v", - err, - ) - log.Errorln(err) - return jobSuccess - } - - err = c.Client.CoreV1().Secrets(job.Namespace).Delete(job.Name, &metav1.DeleteOptions{}) - if err != nil { - return false - } - - for _, volume := range job.Spec.Template.Spec.Volumes { - claim := volume.PersistentVolumeClaim - if claim != nil { - err := c.Client.CoreV1().PersistentVolumeClaims(job.Namespace).Delete(claim.ClaimName, nil) - if err != nil { - recorder.Eventf( - runtimeObj, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToDelete, - "Failed to delete PersistentVolumeClaim. Reason: %v", - err, - ) - log.Errorln(err) - } - } - } - - if err := c.Client.BatchV1().Jobs(job.Namespace).Delete(job.Name, nil); err != nil { - recorder.Eventf( - runtimeObj, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToDelete, - "Failed to delete Job. Reason: %v", - err, - ) - log.Errorln(err) - } - - return jobSuccess -} - -func (c *Controller) checkGoverningService(name, namespace string) (bool, error) { - _, err := c.Client.CoreV1().Services(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - if kerr.IsNotFound(err) { - return false, nil - } else { - return false, err - } - } - - return true, nil -} - -func (c *Controller) CreateGoverningService(name, namespace string) error { - // Check if service name exists - found, err := c.checkGoverningService(name, namespace) - if err != nil { - return err - } - if found { - return nil - } - - service := &apiv1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - Spec: apiv1.ServiceSpec{ - Type: apiv1.ServiceTypeClusterIP, - ClusterIP: apiv1.ClusterIPNone, - }, - } - _, err = c.Client.CoreV1().Services(namespace).Create(service) - return err -} - -func (c *Controller) DeleteService(name, namespace string) error { - service, err := c.Client.CoreV1().Services(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - if kerr.IsNotFound(err) { - return nil - } else { - return err - } - } - - if service.Spec.Selector[tapi.LabelDatabaseName] != name { - return nil - } - - return c.Client.CoreV1().Services(namespace).Delete(name, nil) -} - -func (c *Controller) DeleteStatefulSet(name, namespace string) error { - statefulSet, err := c.Client.AppsV1beta1().StatefulSets(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - if kerr.IsNotFound(err) { - return nil - } else { - return err - } - } - - // Update StatefulSet - replicas := int32(0) - statefulSet.Spec.Replicas = &replicas - if _, err := c.Client.AppsV1beta1().StatefulSets(statefulSet.Namespace).Update(statefulSet); err != nil { - return err - } - - var checkSuccess bool = false - then := time.Now() - now := time.Now() - for now.Sub(then) < time.Minute*10 { - podList, err := c.Client.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{ - LabelSelector: labels.Set(statefulSet.Spec.Selector.MatchLabels).AsSelector().String(), - }) - if err != nil { - return err - } - if len(podList.Items) == 0 { - checkSuccess = true - break - } - - time.Sleep(sleepDuration) - now = time.Now() - } - - if !checkSuccess { - return errors.New("Fail to delete StatefulSet Pods") - } - - // Delete StatefulSet - return c.Client.AppsV1beta1().StatefulSets(statefulSet.Namespace).Delete(statefulSet.Name, nil) -} - -func (c *Controller) DeleteSecret(name, namespace string) error { - if _, err := c.Client.CoreV1().Secrets(namespace).Get(name, metav1.GetOptions{}); err != nil { - if kerr.IsNotFound(err) { - return nil - } else { - return err - } - } - - return c.Client.CoreV1().Secrets(namespace).Delete(name, nil) -} diff --git a/vendor/github.com/k8sdb/apimachinery/pkg/controller/snapshot.go b/vendor/github.com/k8sdb/apimachinery/pkg/controller/snapshot.go deleted file mode 100644 index b90806372..000000000 --- a/vendor/github.com/k8sdb/apimachinery/pkg/controller/snapshot.go +++ /dev/null @@ -1,455 +0,0 @@ -package controller - -import ( - "fmt" - "time" - - "github.com/appscode/go/wait" - "github.com/appscode/log" - tapi "github.com/k8sdb/apimachinery/api" - tcs "github.com/k8sdb/apimachinery/client/clientset" - "github.com/k8sdb/apimachinery/pkg/analytics" - "github.com/k8sdb/apimachinery/pkg/eventer" - "github.com/k8sdb/apimachinery/pkg/storage" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - clientset "k8s.io/client-go/kubernetes" - apiv1 "k8s.io/client-go/pkg/api/v1" - batch "k8s.io/client-go/pkg/apis/batch/v1" - extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" -) - -type Snapshotter interface { - ValidateSnapshot(*tapi.Snapshot) error - GetDatabase(*tapi.Snapshot) (runtime.Object, error) - GetSnapshotter(*tapi.Snapshot) (*batch.Job, error) - WipeOutSnapshot(*tapi.Snapshot) error -} - -type SnapshotController struct { - // Kubernetes client - client clientset.Interface - // ThirdPartyExtension client - extClient tcs.ExtensionInterface - // Snapshotter interface - snapshoter Snapshotter - // ListerWatcher - lw *cache.ListWatch - // Event Recorder - eventRecorder record.EventRecorder - // sync time to sync the list. - syncPeriod time.Duration -} - -// NewSnapshotController creates a new SnapshotController -func NewSnapshotController( - client clientset.Interface, - extClient tcs.ExtensionInterface, - snapshoter Snapshotter, - lw *cache.ListWatch, - syncPeriod time.Duration, -) *SnapshotController { - - // return new DormantDatabase Controller - return &SnapshotController{ - client: client, - extClient: extClient, - snapshoter: snapshoter, - lw: lw, - eventRecorder: eventer.NewEventRecorder(client, "Snapshot Controller"), - syncPeriod: syncPeriod, - } -} - -func (c *SnapshotController) Run() { - // Ensure DormantDatabase TPR - c.ensureThirdPartyResource() - // Watch DormantDatabase with provided ListerWatcher - c.watch() -} - -// Ensure Snapshot ThirdPartyResource -func (c *SnapshotController) ensureThirdPartyResource() { - log.Infoln("Ensuring Snapshot ThirdPartyResource") - - resourceName := tapi.ResourceNameSnapshot + "." + tapi.V1alpha1SchemeGroupVersion.Group - var err error - if _, err = c.client.ExtensionsV1beta1().ThirdPartyResources().Get(resourceName, metav1.GetOptions{}); err == nil { - return - } - if !errors.IsNotFound(err) { - log.Fatalln(err) - } - - thirdPartyResource := &extensions.ThirdPartyResource{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "extensions/v1beta1", - Kind: "ThirdPartyResource", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: resourceName, - Labels: map[string]string{ - "app": "kubedb", - }, - }, - Description: "Snapshot of KubeDB databases", - Versions: []extensions.APIVersion{ - { - Name: tapi.V1alpha1SchemeGroupVersion.Version, - }, - }, - } - if _, err := c.client.ExtensionsV1beta1().ThirdPartyResources().Create(thirdPartyResource); err != nil { - log.Fatalln(err) - } -} - -func (c *SnapshotController) watch() { - _, cacheController := cache.NewInformer(c.lw, - &tapi.Snapshot{}, - c.syncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - snapshot := obj.(*tapi.Snapshot) - if snapshot.Status.StartTime == nil { - if err := c.create(snapshot); err != nil { - snapshotFailedToCreate() - log.Errorln(err) - } else { - snapshotSuccessfullyCreated() - } - } - }, - DeleteFunc: func(obj interface{}) { - snapshot := obj.(*tapi.Snapshot) - if err := c.delete(snapshot); err != nil { - snapshotFailedToDelete() - log.Errorln(err) - } else { - snapshotSuccessfullyDeleted() - } - }, - }, - ) - cacheController.Run(wait.NeverStop) -} - -const ( - durationCheckSnapshotJob = time.Minute * 30 -) - -func (c *SnapshotController) create(snapshot *tapi.Snapshot) error { - var err error - if snapshot, err = c.extClient.Snapshots(snapshot.Namespace).Get(snapshot.Name); err != nil { - return err - } - - t := metav1.Now() - snapshot.Status.StartTime = &t - if _, err = c.extClient.Snapshots(snapshot.Namespace).Update(snapshot); err != nil { - c.eventRecorder.Eventf( - snapshot, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToUpdate, - `Fail to update Elastic: "%v". Reason: %v`, - snapshot.Name, - err, - ) - log.Errorln(err) - } - - // Validate DatabaseSnapshot spec - if err := c.snapshoter.ValidateSnapshot(snapshot); err != nil { - c.eventRecorder.Event(snapshot, apiv1.EventTypeWarning, eventer.EventReasonInvalid, err.Error()) - return err - } - - runtimeObj, err := c.snapshoter.GetDatabase(snapshot) - if err != nil { - c.eventRecorder.Event(snapshot, apiv1.EventTypeWarning, eventer.EventReasonFailedToGet, err.Error()) - return err - } - - if snapshot, err = c.extClient.Snapshots(snapshot.Namespace).Get(snapshot.Name); err != nil { - return err - } - - snapshot.Labels[tapi.LabelDatabaseName] = snapshot.Spec.DatabaseName - snapshot.Labels[tapi.LabelSnapshotStatus] = string(tapi.SnapshotPhaseRunning) - snapshot.Status.Phase = tapi.SnapshotPhaseRunning - if _, err = c.extClient.Snapshots(snapshot.Namespace).Update(snapshot); err != nil { - c.eventRecorder.Eventf( - snapshot, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToUpdate, - "Failed to update Snapshot. Reason: %v", - err, - ) - log.Errorln(err) - } - - c.eventRecorder.Event(runtimeObj, apiv1.EventTypeNormal, eventer.EventReasonStarting, "Backup running") - c.eventRecorder.Event(snapshot, apiv1.EventTypeNormal, eventer.EventReasonStarting, "Backup running") - - secret, err := storage.NewOSMSecret(c.client, snapshot) - if err != nil { - message := fmt.Sprintf("Failed to generate osm secret. Reason: %v", err) - c.eventRecorder.Event(runtimeObj, apiv1.EventTypeWarning, eventer.EventReasonSnapshotFailed, message) - c.eventRecorder.Event(snapshot, apiv1.EventTypeWarning, eventer.EventReasonSnapshotFailed, message) - return err - } - _, err = c.client.CoreV1().Secrets(secret.Namespace).Create(secret) - if err != nil { - message := fmt.Sprintf("Failed to create osm secret. Reason: %v", err) - c.eventRecorder.Event(runtimeObj, apiv1.EventTypeWarning, eventer.EventReasonSnapshotFailed, message) - c.eventRecorder.Event(snapshot, apiv1.EventTypeWarning, eventer.EventReasonSnapshotFailed, message) - return err - } - - job, err := c.snapshoter.GetSnapshotter(snapshot) - if err != nil { - message := fmt.Sprintf("Failed to take snapshot. Reason: %v", err) - c.eventRecorder.Event(runtimeObj, apiv1.EventTypeWarning, eventer.EventReasonSnapshotFailed, message) - c.eventRecorder.Event(snapshot, apiv1.EventTypeWarning, eventer.EventReasonSnapshotFailed, message) - return err - } - if _, err := c.client.BatchV1().Jobs(snapshot.Namespace).Create(job); err != nil { - message := fmt.Sprintf("Failed to take snapshot. Reason: %v", err) - c.eventRecorder.Event(runtimeObj, apiv1.EventTypeWarning, eventer.EventReasonSnapshotFailed, message) - c.eventRecorder.Event(snapshot, apiv1.EventTypeWarning, eventer.EventReasonSnapshotFailed, message) - return err - } - - go func() { - if err := c.checkSnapshotJob(snapshot, job.Name, durationCheckSnapshotJob); err != nil { - log.Errorln(err) - } - }() - - return nil -} - -func (c *SnapshotController) delete(snapshot *tapi.Snapshot) error { - runtimeObj, err := c.snapshoter.GetDatabase(snapshot) - if err != nil { - if !errors.IsNotFound(err) { - c.eventRecorder.Event( - snapshot, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToGet, - err.Error(), - ) - return err - } - } - - if runtimeObj != nil { - c.eventRecorder.Eventf( - runtimeObj, - apiv1.EventTypeNormal, - eventer.EventReasonWipingOut, - "Wiping out Snapshot: %v", - snapshot.Name, - ) - } - - if err := c.snapshoter.WipeOutSnapshot(snapshot); err != nil { - if runtimeObj != nil { - c.eventRecorder.Eventf( - runtimeObj, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToWipeOut, - "Failed to wipeOut. Reason: %v", - err, - ) - } - return err - } - - if runtimeObj != nil { - c.eventRecorder.Eventf( - runtimeObj, - apiv1.EventTypeNormal, - eventer.EventReasonSuccessfulWipeOut, - "Successfully wiped out Snapshot: %v", - snapshot.Name, - ) - } - return nil -} - -func (c *SnapshotController) checkSnapshotJob(snapshot *tapi.Snapshot, jobName string, checkDuration time.Duration) error { - - var jobSuccess bool = false - var job *batch.Job - var err error - then := time.Now() - now := time.Now() - for now.Sub(then) < checkDuration { - log.Debugln("Checking for Job ", jobName) - job, err = c.client.BatchV1().Jobs(snapshot.Namespace).Get(jobName, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - time.Sleep(sleepDuration) - now = time.Now() - continue - } - c.eventRecorder.Eventf( - snapshot, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToList, - "Failed to get Job. Reason: %v", - err, - ) - return err - } - log.Debugf("Pods Statuses: %d Running / %d Succeeded / %d Failed", - job.Status.Active, job.Status.Succeeded, job.Status.Failed) - // If job is success - if job.Status.Succeeded > 0 { - jobSuccess = true - break - } else if job.Status.Failed > 0 { - break - } - - time.Sleep(sleepDuration) - now = time.Now() - } - - if err != nil { - return err - } - - r, err := metav1.LabelSelectorAsSelector(job.Spec.Selector) - if err != nil { - return err - } - err = c.client.CoreV1().Pods(job.Namespace).DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{ - LabelSelector: r.String(), - }) - if err != nil { - c.eventRecorder.Eventf( - snapshot, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToDelete, - "Failed to delete Pods. Reason: %v", - err, - ) - log.Errorln(err) - } - - err = c.client.CoreV1().Secrets(snapshot.Namespace).Delete(snapshot.Name, &metav1.DeleteOptions{}) - if err != nil { - c.eventRecorder.Eventf( - snapshot, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToDelete, - "Failed to delete Secret. Reason: %v", - err, - ) - log.Errorln(err) - } - - for _, volume := range job.Spec.Template.Spec.Volumes { - claim := volume.PersistentVolumeClaim - if claim != nil { - err := c.client.CoreV1().PersistentVolumeClaims(job.Namespace).Delete(claim.ClaimName, nil) - if err != nil { - c.eventRecorder.Eventf( - snapshot, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToDelete, - "Failed to delete PersistentVolumeClaim. Reason: %v", - err, - ) - log.Errorln(err) - } - } - } - - if err := c.client.BatchV1().Jobs(job.Namespace).Delete(job.Name, nil); err != nil { - c.eventRecorder.Eventf( - snapshot, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToDelete, - "Failed to delete Job. Reason: %v", - err, - ) - log.Errorln(err) - } - - if snapshot, err = c.extClient.Snapshots(snapshot.Namespace).Get(snapshot.Name); err != nil { - return err - } - - runtimeObj, err := c.snapshoter.GetDatabase(snapshot) - if err != nil { - c.eventRecorder.Event(snapshot, apiv1.EventTypeWarning, eventer.EventReasonFailedToGet, err.Error()) - return err - } - - t := metav1.Now() - snapshot.Status.CompletionTime = &t - if jobSuccess { - snapshot.Status.Phase = tapi.SnapshotPhaseSuccessed - c.eventRecorder.Event( - runtimeObj, - apiv1.EventTypeNormal, - eventer.EventReasonSuccessfulSnapshot, - "Successfully completed snapshot", - ) - c.eventRecorder.Event( - snapshot, - apiv1.EventTypeNormal, - eventer.EventReasonSuccessfulSnapshot, - "Successfully completed snapshot", - ) - } else { - snapshot.Status.Phase = tapi.SnapshotPhaseFailed - c.eventRecorder.Event( - runtimeObj, - apiv1.EventTypeWarning, - eventer.EventReasonSnapshotFailed, - "Failed to complete snapshot", - ) - c.eventRecorder.Event( - snapshot, - apiv1.EventTypeWarning, - eventer.EventReasonSnapshotFailed, - "Failed to complete snapshot", - ) - } - - delete(snapshot.Labels, tapi.LabelSnapshotStatus) - if _, err := c.extClient.Snapshots(snapshot.Namespace).Update(snapshot); err != nil { - c.eventRecorder.Eventf( - snapshot, - apiv1.EventTypeWarning, - eventer.EventReasonFailedToUpdate, - "Failed to update Snapshot. Reason: %v", - err, - ) - log.Errorln(err) - } - return nil -} - -func snapshotSuccessfullyCreated() { - analytics.SendEvent(tapi.ResourceNameSnapshot, "created", "success") -} - -func snapshotFailedToCreate() { - analytics.SendEvent(tapi.ResourceNameSnapshot, "created", "failure") -} - -func snapshotSuccessfullyDeleted() { - analytics.SendEvent(tapi.ResourceNameSnapshot, "deleted", "success") -} - -func snapshotFailedToDelete() { - analytics.SendEvent(tapi.ResourceNameSnapshot, "deleted", "failure") -} diff --git a/vendor/github.com/k8sdb/apimachinery/pkg/eventer/event_recorder.go b/vendor/github.com/k8sdb/apimachinery/pkg/eventer/event_recorder.go deleted file mode 100644 index 12b23072b..000000000 --- a/vendor/github.com/k8sdb/apimachinery/pkg/eventer/event_recorder.go +++ /dev/null @@ -1,60 +0,0 @@ -package eventer - -import ( - "github.com/appscode/log" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/pkg/api" - apiv1 "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/record" -) - -const ( - EventReasonCreating string = "Creating" - EventReasonPausing string = "Pausing" - EventReasonWipingOut string = "WipingOut" - EventReasonFailedToCreate string = "Failed" - EventReasonFailedToPause string = "Failed" - EventReasonFailedToDelete string = "Failed" - EventReasonFailedToWipeOut string = "Failed" - EventReasonFailedToGet string = "Failed" - EventReasonFailedToInitialize string = "Failed" - EventReasonFailedToList string = "Failed" - EventReasonFailedToResume string = "Failed" - EventReasonFailedToSchedule string = "Failed" - EventReasonFailedToStart string = "Failed" - EventReasonFailedToUpdate string = "Failed" - EventReasonFailedToAddMonitor string = "Failed" - EventReasonFailedToDeleteMonitor string = "Failed" - EventReasonFailedToUpdateMonitor string = "Failed" - EventReasonIgnoredSnapshot string = "IgnoredSnapshot" - EventReasonInitializing string = "Initializing" - EventReasonInvalid string = "Invalid" - EventReasonInvalidUpdate string = "InvalidUpdate" - EventReasonResuming string = "Resuming" - EventReasonSnapshotFailed string = "SnapshotFailed" - EventReasonStarting string = "Starting" - EventReasonSuccessfulCreate string = "SuccessfulCreate" - EventReasonSuccessfulPause string = "SuccessfulPause" - EventReasonSuccessfulMonitorAdd string = "SuccessfulMonitorAdd" - EventReasonSuccessfulMonitorDelete string = "SuccessfulMonitorDelete" - EventReasonSuccessfulMonitorUpdate string = "SuccessfulMonitorUpdate" - EventReasonSuccessfulResume string = "SuccessfulResume" - EventReasonSuccessfulWipeOut string = "SuccessfulWipeOut" - EventReasonSuccessfulSnapshot string = "SuccessfulSnapshot" - EventReasonSuccessfulValidate string = "SuccessfulValidate" - EventReasonSuccessfulInitialize string = "SuccessfulInitialize" -) - -func NewEventRecorder(client clientset.Interface, component string) record.EventRecorder { - // Event Broadcaster - broadcaster := record.NewBroadcaster() - broadcaster.StartEventWatcher( - func(event *apiv1.Event) { - if _, err := client.Core().Events(event.Namespace).Create(event); err != nil { - log.Errorln(err) - } - }, - ) - - return broadcaster.NewRecorder(api.Scheme, apiv1.EventSource{Component: component}) -} diff --git a/vendor/github.com/orcaman/concurrent-map/LICENSE b/vendor/github.com/orcaman/concurrent-map/LICENSE deleted file mode 100644 index ea2fec0c5..000000000 --- a/vendor/github.com/orcaman/concurrent-map/LICENSE +++ /dev/null @@ -1,22 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2014 streamrail - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. - diff --git a/vendor/github.com/orcaman/concurrent-map/concurrent_map.go b/vendor/github.com/orcaman/concurrent-map/concurrent_map.go deleted file mode 100644 index faa55ea27..000000000 --- a/vendor/github.com/orcaman/concurrent-map/concurrent_map.go +++ /dev/null @@ -1,315 +0,0 @@ -package cmap - -import ( - "encoding/json" - "sync" -) - -var SHARD_COUNT = 32 - -// A "thread" safe map of type string:Anything. -// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards. -type ConcurrentMap []*ConcurrentMapShared - -// A "thread" safe string to anything map. -type ConcurrentMapShared struct { - items map[string]interface{} - sync.RWMutex // Read Write mutex, guards access to internal map. -} - -// Creates a new concurrent map. -func New() ConcurrentMap { - m := make(ConcurrentMap, SHARD_COUNT) - for i := 0; i < SHARD_COUNT; i++ { - m[i] = &ConcurrentMapShared{items: make(map[string]interface{})} - } - return m -} - -// Returns shard under given key -func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { - return m[uint(fnv32(key))%uint(SHARD_COUNT)] -} - -func (m ConcurrentMap) MSet(data map[string]interface{}) { - for key, value := range data { - shard := m.GetShard(key) - shard.Lock() - shard.items[key] = value - shard.Unlock() - } -} - -// Sets the given value under the specified key. -func (m ConcurrentMap) Set(key string, value interface{}) { - // Get map shard. - shard := m.GetShard(key) - shard.Lock() - shard.items[key] = value - shard.Unlock() -} - -// Callback to return new element to be inserted into the map -// It is called while lock is held, therefore it MUST NOT -// try to access other keys in same map, as it can lead to deadlock since -// Go sync.RWLock is not reentrant -type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{} - -// Insert or Update - updates existing element or inserts a new one using UpsertCb -func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) { - shard := m.GetShard(key) - shard.Lock() - v, ok := shard.items[key] - res = cb(ok, v, value) - shard.items[key] = res - shard.Unlock() - return res -} - -// Sets the given value under the specified key if no value was associated with it. -func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool { - // Get map shard. - shard := m.GetShard(key) - shard.Lock() - _, ok := shard.items[key] - if !ok { - shard.items[key] = value - } - shard.Unlock() - return !ok -} - -// Retrieves an element from map under given key. -func (m ConcurrentMap) Get(key string) (interface{}, bool) { - // Get shard - shard := m.GetShard(key) - shard.RLock() - // Get item from shard. - val, ok := shard.items[key] - shard.RUnlock() - return val, ok -} - -// Returns the number of elements within the map. -func (m ConcurrentMap) Count() int { - count := 0 - for i := 0; i < SHARD_COUNT; i++ { - shard := m[i] - shard.RLock() - count += len(shard.items) - shard.RUnlock() - } - return count -} - -// Looks up an item under specified key -func (m ConcurrentMap) Has(key string) bool { - // Get shard - shard := m.GetShard(key) - shard.RLock() - // See if element is within shard. - _, ok := shard.items[key] - shard.RUnlock() - return ok -} - -// Removes an element from the map. -func (m ConcurrentMap) Remove(key string) { - // Try to get shard. - shard := m.GetShard(key) - shard.Lock() - delete(shard.items, key) - shard.Unlock() -} - -// Removes an element from the map and returns it -func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool) { - // Try to get shard. - shard := m.GetShard(key) - shard.Lock() - v, exists = shard.items[key] - delete(shard.items, key) - shard.Unlock() - return v, exists -} - -// Checks if map is empty. -func (m ConcurrentMap) IsEmpty() bool { - return m.Count() == 0 -} - -// Used by the Iter & IterBuffered functions to wrap two variables together over a channel, -type Tuple struct { - Key string - Val interface{} -} - -// Returns an iterator which could be used in a for range loop. -// -// Deprecated: using IterBuffered() will get a better performence -func (m ConcurrentMap) Iter() <-chan Tuple { - chans := snapshot(m) - ch := make(chan Tuple) - go fanIn(chans, ch) - return ch -} - -// Returns a buffered iterator which could be used in a for range loop. -func (m ConcurrentMap) IterBuffered() <-chan Tuple { - chans := snapshot(m) - total := 0 - for _, c := range chans { - total += cap(c) - } - ch := make(chan Tuple, total) - go fanIn(chans, ch) - return ch -} - -// Returns a array of channels that contains elements in each shard, -// which likely takes a snapshot of `m`. -// It returns once the size of each buffered channel is determined, -// before all the channels are populated using goroutines. -func snapshot(m ConcurrentMap) (chans []chan Tuple) { - chans = make([]chan Tuple, SHARD_COUNT) - wg := sync.WaitGroup{} - wg.Add(SHARD_COUNT) - // Foreach shard. - for index, shard := range m { - go func(index int, shard *ConcurrentMapShared) { - // Foreach key, value pair. - shard.RLock() - chans[index] = make(chan Tuple, len(shard.items)) - wg.Done() - for key, val := range shard.items { - chans[index] <- Tuple{key, val} - } - shard.RUnlock() - close(chans[index]) - }(index, shard) - } - wg.Wait() - return chans -} - -// fanIn reads elements from channels `chans` into channel `out` -func fanIn(chans []chan Tuple, out chan Tuple) { - wg := sync.WaitGroup{} - wg.Add(len(chans)) - for _, ch := range chans { - go func(ch chan Tuple) { - for t := range ch { - out <- t - } - wg.Done() - }(ch) - } - wg.Wait() - close(out) -} - -// Returns all items as map[string]interface{} -func (m ConcurrentMap) Items() map[string]interface{} { - tmp := make(map[string]interface{}) - - // Insert items to temporary map. - for item := range m.IterBuffered() { - tmp[item.Key] = item.Val - } - - return tmp -} - -// Iterator callback,called for every key,value found in -// maps. RLock is held for all calls for a given shard -// therefore callback sess consistent view of a shard, -// but not across the shards -type IterCb func(key string, v interface{}) - -// Callback based iterator, cheapest way to read -// all elements in a map. -func (m ConcurrentMap) IterCb(fn IterCb) { - for idx := range m { - shard := (m)[idx] - shard.RLock() - for key, value := range shard.items { - fn(key, value) - } - shard.RUnlock() - } -} - -// Return all keys as []string -func (m ConcurrentMap) Keys() []string { - count := m.Count() - ch := make(chan string, count) - go func() { - // Foreach shard. - wg := sync.WaitGroup{} - wg.Add(SHARD_COUNT) - for _, shard := range m { - go func(shard *ConcurrentMapShared) { - // Foreach key, value pair. - shard.RLock() - for key := range shard.items { - ch <- key - } - shard.RUnlock() - wg.Done() - }(shard) - } - wg.Wait() - close(ch) - }() - - // Generate keys - keys := make([]string, 0, count) - for k := range ch { - keys = append(keys, k) - } - return keys -} - -//Reviles ConcurrentMap "private" variables to json marshal. -func (m ConcurrentMap) MarshalJSON() ([]byte, error) { - // Create a temporary map, which will hold all item spread across shards. - tmp := make(map[string]interface{}) - - // Insert items to temporary map. - for item := range m.IterBuffered() { - tmp[item.Key] = item.Val - } - return json.Marshal(tmp) -} - -func fnv32(key string) uint32 { - hash := uint32(2166136261) - const prime32 = uint32(16777619) - for i := 0; i < len(key); i++ { - hash *= prime32 - hash ^= uint32(key[i]) - } - return hash -} - -// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal -// will probably won't know which to type to unmarshal into, in such case -// we'll end up with a value of type map[string]interface{}, In most cases this isn't -// out value type, this is why we've decided to remove this functionality. - -// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) { -// // Reverse process of Marshal. - -// tmp := make(map[string]interface{}) - -// // Unmarshal into a single map. -// if err := json.Unmarshal(b, &tmp); err != nil { -// return nil -// } - -// // foreach key,value pair in temporary map insert into our concurrent map. -// for key, val := range tmp { -// m.Set(key, val) -// } -// return nil -// } diff --git a/vendor/gopkg.in/robfig/cron.v2/LICENSE b/vendor/gopkg.in/robfig/cron.v2/LICENSE deleted file mode 100644 index 3a0f627ff..000000000 --- a/vendor/gopkg.in/robfig/cron.v2/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -Copyright (C) 2012 Rob Figueiredo -All Rights Reserved. - -MIT LICENSE - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software is furnished to do so, -subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/gopkg.in/robfig/cron.v2/constantdelay.go b/vendor/gopkg.in/robfig/cron.v2/constantdelay.go deleted file mode 100644 index cd6e7b1be..000000000 --- a/vendor/gopkg.in/robfig/cron.v2/constantdelay.go +++ /dev/null @@ -1,27 +0,0 @@ -package cron - -import "time" - -// ConstantDelaySchedule represents a simple recurring duty cycle, e.g. "Every 5 minutes". -// It does not support jobs more frequent than once a second. -type ConstantDelaySchedule struct { - Delay time.Duration -} - -// Every returns a crontab Schedule that activates once every duration. -// Delays of less than a second are not supported (will round up to 1 second). -// Any fields less than a Second are truncated. -func Every(duration time.Duration) ConstantDelaySchedule { - if duration < time.Second { - duration = time.Second - } - return ConstantDelaySchedule{ - Delay: duration - time.Duration(duration.Nanoseconds())%time.Second, - } -} - -// Next returns the next time this should be run. -// This rounds so that the next activation time will be on the second. -func (schedule ConstantDelaySchedule) Next(t time.Time) time.Time { - return t.Add(schedule.Delay - time.Duration(t.Nanosecond())*time.Nanosecond) -} diff --git a/vendor/gopkg.in/robfig/cron.v2/cron.go b/vendor/gopkg.in/robfig/cron.v2/cron.go deleted file mode 100644 index 62d2d839e..000000000 --- a/vendor/gopkg.in/robfig/cron.v2/cron.go +++ /dev/null @@ -1,236 +0,0 @@ -// Package cron implements a cron spec parser and runner. -package cron // import "gopkg.in/robfig/cron.v2" - -import ( - "sort" - "time" -) - -// Cron keeps track of any number of entries, invoking the associated func as -// specified by the schedule. It may be started, stopped, and the entries may -// be inspected while running. -type Cron struct { - entries []*Entry - stop chan struct{} - add chan *Entry - remove chan EntryID - snapshot chan []Entry - running bool - nextID EntryID -} - -// Job is an interface for submitted cron jobs. -type Job interface { - Run() -} - -// Schedule describes a job's duty cycle. -type Schedule interface { - // Next returns the next activation time, later than the given time. - // Next is invoked initially, and then each time the job is run. - Next(time.Time) time.Time -} - -// EntryID identifies an entry within a Cron instance -type EntryID int - -// Entry consists of a schedule and the func to execute on that schedule. -type Entry struct { - // ID is the cron-assigned ID of this entry, which may be used to look up a - // snapshot or remove it. - ID EntryID - - // Schedule on which this job should be run. - Schedule Schedule - - // Next time the job will run, or the zero time if Cron has not been - // started or this entry's schedule is unsatisfiable - Next time.Time - - // Prev is the last time this job was run, or the zero time if never. - Prev time.Time - - // Job is the thing to run when the Schedule is activated. - Job Job -} - -// Valid returns true if this is not the zero entry. -func (e Entry) Valid() bool { return e.ID != 0 } - -// byTime is a wrapper for sorting the entry array by time -// (with zero time at the end). -type byTime []*Entry - -func (s byTime) Len() int { return len(s) } -func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s byTime) Less(i, j int) bool { - // Two zero times should return false. - // Otherwise, zero is "greater" than any other time. - // (To sort it at the end of the list.) - if s[i].Next.IsZero() { - return false - } - if s[j].Next.IsZero() { - return true - } - return s[i].Next.Before(s[j].Next) -} - -// New returns a new Cron job runner. -func New() *Cron { - return &Cron{ - entries: nil, - add: make(chan *Entry), - stop: make(chan struct{}), - snapshot: make(chan []Entry), - remove: make(chan EntryID), - running: false, - } -} - -// FuncJob is a wrapper that turns a func() into a cron.Job -type FuncJob func() - -func (f FuncJob) Run() { f() } - -// AddFunc adds a func to the Cron to be run on the given schedule. -func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) { - return c.AddJob(spec, FuncJob(cmd)) -} - -// AddJob adds a Job to the Cron to be run on the given schedule. -func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) { - schedule, err := Parse(spec) - if err != nil { - return 0, err - } - return c.Schedule(schedule, cmd), nil -} - -// Schedule adds a Job to the Cron to be run on the given schedule. -func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { - c.nextID++ - entry := &Entry{ - ID: c.nextID, - Schedule: schedule, - Job: cmd, - } - if !c.running { - c.entries = append(c.entries, entry) - } else { - c.add <- entry - } - return entry.ID -} - -// Entries returns a snapshot of the cron entries. -func (c *Cron) Entries() []Entry { - if c.running { - c.snapshot <- nil - return <-c.snapshot - } - return c.entrySnapshot() -} - -// Entry returns a snapshot of the given entry, or nil if it couldn't be found. -func (c *Cron) Entry(id EntryID) Entry { - for _, entry := range c.Entries() { - if id == entry.ID { - return entry - } - } - return Entry{} -} - -// Remove an entry from being run in the future. -func (c *Cron) Remove(id EntryID) { - if c.running { - c.remove <- id - } else { - c.removeEntry(id) - } -} - -// Start the cron scheduler in its own go-routine. -func (c *Cron) Start() { - c.running = true - go c.run() -} - -// run the scheduler.. this is private just due to the need to synchronize -// access to the 'running' state variable. -func (c *Cron) run() { - // Figure out the next activation times for each entry. - now := time.Now().Local() - for _, entry := range c.entries { - entry.Next = entry.Schedule.Next(now) - } - - for { - // Determine the next entry to run. - sort.Sort(byTime(c.entries)) - - var effective time.Time - if len(c.entries) == 0 || c.entries[0].Next.IsZero() { - // If there are no entries yet, just sleep - it still handles new entries - // and stop requests. - effective = now.AddDate(10, 0, 0) - } else { - effective = c.entries[0].Next - } - - select { - case now = <-time.After(effective.Sub(now)): - // Run every entry whose next time was this effective time. - for _, e := range c.entries { - if e.Next != effective { - break - } - go e.Job.Run() - e.Prev = e.Next - e.Next = e.Schedule.Next(effective) - } - continue - - case newEntry := <-c.add: - c.entries = append(c.entries, newEntry) - newEntry.Next = newEntry.Schedule.Next(now) - - case <-c.snapshot: - c.snapshot <- c.entrySnapshot() - - case id := <-c.remove: - c.removeEntry(id) - - case <-c.stop: - return - } - - now = time.Now().Local() - } -} - -// Stop the cron scheduler. -func (c *Cron) Stop() { - c.stop <- struct{}{} - c.running = false -} - -// entrySnapshot returns a copy of the current cron entry list. -func (c *Cron) entrySnapshot() []Entry { - var entries = make([]Entry, len(c.entries)) - for i, e := range c.entries { - entries[i] = *e - } - return entries -} - -func (c *Cron) removeEntry(id EntryID) { - var entries []*Entry - for _, e := range c.entries { - if e.ID != id { - entries = append(entries, e) - } - } - c.entries = entries -} diff --git a/vendor/gopkg.in/robfig/cron.v2/doc.go b/vendor/gopkg.in/robfig/cron.v2/doc.go deleted file mode 100644 index 31cd74a62..000000000 --- a/vendor/gopkg.in/robfig/cron.v2/doc.go +++ /dev/null @@ -1,132 +0,0 @@ -/* -Package cron implements a cron spec parser and job runner. - -Usage - -Callers may register Funcs to be invoked on a given schedule. Cron will run -them in their own goroutines. - - c := cron.New() - c.AddFunc("0 30 * * * *", func() { fmt.Println("Every hour on the half hour") }) - c.AddFunc("TZ=Asia/Tokyo 30 04 * * * *", func() { fmt.Println("Runs at 04:30 Tokyo time every day") }) - c.AddFunc("@hourly", func() { fmt.Println("Every hour") }) - c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty") }) - c.Start() - .. - // Funcs are invoked in their own goroutine, asynchronously. - ... - // Funcs may also be added to a running Cron - c.AddFunc("@daily", func() { fmt.Println("Every day") }) - .. - // Inspect the cron job entries' next and previous run times. - inspect(c.Entries()) - .. - c.Stop() // Stop the scheduler (does not stop any jobs already running). - -CRON Expression Format - -A cron expression represents a set of times, using 6 space-separated fields. - - Field name | Mandatory? | Allowed values | Allowed special characters - ---------- | ---------- | -------------- | -------------------------- - Seconds | No | 0-59 | * / , - - Minutes | Yes | 0-59 | * / , - - Hours | Yes | 0-23 | * / , - - Day of month | Yes | 1-31 | * / , - ? - Month | Yes | 1-12 or JAN-DEC | * / , - - Day of week | Yes | 0-6 or SUN-SAT | * / , - ? - -Note: Month and Day-of-week field values are case insensitive. "SUN", "Sun", -and "sun" are equally accepted. - -Special Characters - -Asterisk ( * ) - -The asterisk indicates that the cron expression will match for all values of the -field; e.g., using an asterisk in the 5th field (month) would indicate every -month. - -Slash ( / ) - -Slashes are used to describe increments of ranges. For example 3-59/15 in the -1st field (minutes) would indicate the 3rd minute of the hour and every 15 -minutes thereafter. The form "*\/..." is equivalent to the form "first-last/...", -that is, an increment over the largest possible range of the field. The form -"N/..." is accepted as meaning "N-MAX/...", that is, starting at N, use the -increment until the end of that specific range. It does not wrap around. - -Comma ( , ) - -Commas are used to separate items of a list. For example, using "MON,WED,FRI" in -the 5th field (day of week) would mean Mondays, Wednesdays and Fridays. - -Hyphen ( - ) - -Hyphens are used to define ranges. For example, 9-17 would indicate every -hour between 9am and 5pm inclusive. - -Question mark ( ? ) - -Question mark may be used instead of '*' for leaving either day-of-month or -day-of-week blank. - -Predefined schedules - -You may use one of several pre-defined schedules in place of a cron expression. - - Entry | Description | Equivalent To - ----- | ----------- | ------------- - @yearly (or @annually) | Run once a year, midnight, Jan. 1st | 0 0 0 1 1 * - @monthly | Run once a month, midnight, first of month | 0 0 0 1 * * - @weekly | Run once a week, midnight on Sunday | 0 0 0 * * 0 - @daily (or @midnight) | Run once a day, midnight | 0 0 0 * * * - @hourly | Run once an hour, beginning of hour | 0 0 * * * * - -Intervals - -You may also schedule a job to execute at fixed intervals. This is supported by -formatting the cron spec like this: - - @every - -where "duration" is a string accepted by time.ParseDuration -(http://golang.org/pkg/time/#ParseDuration). - -For example, "@every 1h30m10s" would indicate a schedule that activates every -1 hour, 30 minutes, 10 seconds. - -Note: The interval does not take the job runtime into account. For example, -if a job takes 3 minutes to run, and it is scheduled to run every 5 minutes, -it will have only 2 minutes of idle time between each run. - -Time zones - -By default, all interpretation and scheduling is done in the machine's local -time zone (as provided by the Go time package http://www.golang.org/pkg/time). -The time zone may be overridden by providing an additional space-separated field -at the beginning of the cron spec, of the form "TZ=Asia/Tokyo" - -Be aware that jobs scheduled during daylight-savings leap-ahead transitions will -not be run! - -Thread safety - -Since the Cron service runs concurrently with the calling code, some amount of -care must be taken to ensure proper synchronization. - -All cron methods are designed to be correctly synchronized as long as the caller -ensures that invocations have a clear happens-before ordering between them. - -Implementation - -Cron entries are stored in an array, sorted by their next activation time. Cron -sleeps until the next job is due to be run. - -Upon waking: - - it runs each entry that is active on that second - - it calculates the next run times for the jobs that were run - - it re-sorts the array of entries by next activation time. - - it goes to sleep until the soonest job. -*/ -package cron diff --git a/vendor/gopkg.in/robfig/cron.v2/parser.go b/vendor/gopkg.in/robfig/cron.v2/parser.go deleted file mode 100644 index a9e6f947a..000000000 --- a/vendor/gopkg.in/robfig/cron.v2/parser.go +++ /dev/null @@ -1,246 +0,0 @@ -package cron - -import ( - "fmt" - "log" - "math" - "strconv" - "strings" - "time" -) - -// Parse returns a new crontab schedule representing the given spec. -// It returns a descriptive error if the spec is not valid. -// -// It accepts -// - Full crontab specs, e.g. "* * * * * ?" -// - Descriptors, e.g. "@midnight", "@every 1h30m" -func Parse(spec string) (_ Schedule, err error) { - // Convert panics into errors - defer func() { - if recovered := recover(); recovered != nil { - err = fmt.Errorf("%v", recovered) - } - }() - - // Extract timezone if present - var loc = time.Local - if strings.HasPrefix(spec, "TZ=") { - i := strings.Index(spec, " ") - if loc, err = time.LoadLocation(spec[3:i]); err != nil { - log.Panicf("Provided bad location %s: %v", spec[3:i], err) - } - spec = strings.TrimSpace(spec[i:]) - } - - // Handle named schedules (descriptors) - if strings.HasPrefix(spec, "@") { - return parseDescriptor(spec, loc), nil - } - - // Split on whitespace. We require 5 or 6 fields. - // (second, optional) (minute) (hour) (day of month) (month) (day of week) - fields := strings.Fields(spec) - if len(fields) != 5 && len(fields) != 6 { - log.Panicf("Expected 5 or 6 fields, found %d: %s", len(fields), spec) - } - - // Add 0 for second field if necessary. - if len(fields) == 5 { - fields = append([]string{"0"}, fields...) - } - - schedule := &SpecSchedule{ - Second: getField(fields[0], seconds), - Minute: getField(fields[1], minutes), - Hour: getField(fields[2], hours), - Dom: getField(fields[3], dom), - Month: getField(fields[4], months), - Dow: getField(fields[5], dow), - Location: loc, - } - - return schedule, nil -} - -// getField returns an Int with the bits set representing all of the times that -// the field represents. A "field" is a comma-separated list of "ranges". -func getField(field string, r bounds) uint64 { - // list = range {"," range} - var bits uint64 - ranges := strings.FieldsFunc(field, func(r rune) bool { return r == ',' }) - for _, expr := range ranges { - bits |= getRange(expr, r) - } - return bits -} - -// getRange returns the bits indicated by the given expression: -// number | number "-" number [ "/" number ] -func getRange(expr string, r bounds) uint64 { - var ( - start, end, step uint - rangeAndStep = strings.Split(expr, "/") - lowAndHigh = strings.Split(rangeAndStep[0], "-") - singleDigit = len(lowAndHigh) == 1 - extraStar uint64 - ) - if lowAndHigh[0] == "*" || lowAndHigh[0] == "?" { - start = r.min - end = r.max - extraStar = starBit - } else { - start = parseIntOrName(lowAndHigh[0], r.names) - switch len(lowAndHigh) { - case 1: - end = start - case 2: - end = parseIntOrName(lowAndHigh[1], r.names) - default: - log.Panicf("Too many hyphens: %s", expr) - } - } - - switch len(rangeAndStep) { - case 1: - step = 1 - case 2: - step = mustParseInt(rangeAndStep[1]) - - // Special handling: "N/step" means "N-max/step". - if singleDigit { - end = r.max - } - default: - log.Panicf("Too many slashes: %s", expr) - } - - if start < r.min { - log.Panicf("Beginning of range (%d) below minimum (%d): %s", start, r.min, expr) - } - if end > r.max { - log.Panicf("End of range (%d) above maximum (%d): %s", end, r.max, expr) - } - if start > end { - log.Panicf("Beginning of range (%d) beyond end of range (%d): %s", start, end, expr) - } - - return getBits(start, end, step) | extraStar -} - -// parseIntOrName returns the (possibly-named) integer contained in expr. -func parseIntOrName(expr string, names map[string]uint) uint { - if names != nil { - if namedInt, ok := names[strings.ToLower(expr)]; ok { - return namedInt - } - } - return mustParseInt(expr) -} - -// mustParseInt parses the given expression as an int or panics. -func mustParseInt(expr string) uint { - num, err := strconv.Atoi(expr) - if err != nil { - log.Panicf("Failed to parse int from %s: %s", expr, err) - } - if num < 0 { - log.Panicf("Negative number (%d) not allowed: %s", num, expr) - } - - return uint(num) -} - -// getBits sets all bits in the range [min, max], modulo the given step size. -func getBits(min, max, step uint) uint64 { - var bits uint64 - - // If step is 1, use shifts. - if step == 1 { - return ^(math.MaxUint64 << (max + 1)) & (math.MaxUint64 << min) - } - - // Else, use a simple loop. - for i := min; i <= max; i += step { - bits |= 1 << i - } - return bits -} - -// all returns all bits within the given bounds. (plus the star bit) -func all(r bounds) uint64 { - return getBits(r.min, r.max, 1) | starBit -} - -// parseDescriptor returns a pre-defined schedule for the expression, or panics -// if none matches. -func parseDescriptor(spec string, loc *time.Location) Schedule { - switch spec { - case "@yearly", "@annually": - return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: 1 << dom.min, - Month: 1 << months.min, - Dow: all(dow), - Location: loc, - } - - case "@monthly": - return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: 1 << dom.min, - Month: all(months), - Dow: all(dow), - Location: loc, - } - - case "@weekly": - return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: all(dom), - Month: all(months), - Dow: 1 << dow.min, - Location: loc, - } - - case "@daily", "@midnight": - return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: all(dom), - Month: all(months), - Dow: all(dow), - Location: loc, - } - - case "@hourly": - return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: all(hours), - Dom: all(dom), - Month: all(months), - Dow: all(dow), - Location: loc, - } - } - - const every = "@every " - if strings.HasPrefix(spec, every) { - duration, err := time.ParseDuration(spec[len(every):]) - if err != nil { - log.Panicf("Failed to parse duration %s: %s", spec, err) - } - return Every(duration) - } - - log.Panicf("Unrecognized descriptor: %s", spec) - return nil -} diff --git a/vendor/gopkg.in/robfig/cron.v2/spec.go b/vendor/gopkg.in/robfig/cron.v2/spec.go deleted file mode 100644 index 3dfd3e088..000000000 --- a/vendor/gopkg.in/robfig/cron.v2/spec.go +++ /dev/null @@ -1,165 +0,0 @@ -package cron - -import "time" - -// SpecSchedule specifies a duty cycle (to the second granularity), based on a -// traditional crontab specification. It is computed initially and stored as bit sets. -type SpecSchedule struct { - Second, Minute, Hour, Dom, Month, Dow uint64 - Location *time.Location -} - -// bounds provides a range of acceptable values (plus a map of name to value). -type bounds struct { - min, max uint - names map[string]uint -} - -// The bounds for each field. -var ( - seconds = bounds{0, 59, nil} - minutes = bounds{0, 59, nil} - hours = bounds{0, 23, nil} - dom = bounds{1, 31, nil} - months = bounds{1, 12, map[string]uint{ - "jan": 1, - "feb": 2, - "mar": 3, - "apr": 4, - "may": 5, - "jun": 6, - "jul": 7, - "aug": 8, - "sep": 9, - "oct": 10, - "nov": 11, - "dec": 12, - }} - dow = bounds{0, 6, map[string]uint{ - "sun": 0, - "mon": 1, - "tue": 2, - "wed": 3, - "thu": 4, - "fri": 5, - "sat": 6, - }} -) - -const ( - // Set the top bit if a star was included in the expression. - starBit = 1 << 63 -) - -// Next returns the next time this schedule is activated, greater than the given -// time. If no time can be found to satisfy the schedule, return the zero time. -func (s *SpecSchedule) Next(t time.Time) time.Time { - // General approach: - // For Month, Day, Hour, Minute, Second: - // Check if the time value matches. If yes, continue to the next field. - // If the field doesn't match the schedule, then increment the field until it matches. - // While incrementing the field, a wrap-around brings it back to the beginning - // of the field list (since it is necessary to re-verify previous field - // values) - - // Convert the given time into the schedule's timezone. - // Save the original timezone so we can convert back after we find a time. - origLocation := t.Location() - t = t.In(s.Location) - - // Start at the earliest possible time (the upcoming second). - t = t.Add(1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond) - - // This flag indicates whether a field has been incremented. - added := false - - // If no time is found within five years, return zero. - yearLimit := t.Year() + 5 - -WRAP: - if t.Year() > yearLimit { - return time.Time{} - } - - // Find the first applicable month. - // If it's this month, then do nothing. - for 1< 0 - dowMatch bool = 1< 0 - ) - - if s.Dom&starBit > 0 || s.Dow&starBit > 0 { - return domMatch && dowMatch - } - return domMatch || dowMatch -}