Skip to content

Commit

Permalink
Merge pull request #201 from arangodb/bugfix/operator-ready-state
Browse files Browse the repository at this point in the history
All operator Pods will now reach the Ready state.
  • Loading branch information
ewoutp authored Jun 26, 2018
2 parents a924758 + b415c7c commit cf338bd
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 11 deletions.
8 changes: 7 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ import (
"github.com/rs/zerolog"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
appsv1beta2 "k8s.io/api/apps/v1beta2"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -271,5 +273,9 @@ func createRecorder(log zerolog.Logger, kubecli kubernetes.Interface, name, name
log.Info().Msgf(format, args...)
})
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.Core().RESTClient()).Events(namespace)})
return eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name})
combinedScheme := runtime.NewScheme()
scheme.AddToScheme(combinedScheme)
v1.AddToScheme(combinedScheme)
appsv1beta2.AddToScheme(combinedScheme)
return eventBroadcaster.NewRecorder(combinedScheme, v1.EventSource{Component: name})
}
4 changes: 2 additions & 2 deletions manifests/templates/deployment-replication/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ rules:
resources: ["nodes"]
verbs: ["get"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["*"]
resources: ["deployments", "replicasets"]
verbs: ["get"]

---

Expand Down
4 changes: 2 additions & 2 deletions manifests/templates/deployment/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ rules:
resources: ["nodes"]
verbs: ["get"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["*"]
resources: ["deployments", "replicasets"]
verbs: ["get"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list"]
Expand Down
3 changes: 3 additions & 0 deletions manifests/templates/storage/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ rules:
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["*"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets"]
verbs: ["get"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["*"]
Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ func NewOperator(config Config, deps Dependencies) (*Operator, error) {
// Run the operator
func (o *Operator) Run() {
if o.Config.EnableDeployment {
go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment)
go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment, o.Dependencies.DeploymentProbe)
}
if o.Config.EnableDeploymentReplication {
go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication)
go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication, o.Dependencies.DeploymentReplicationProbe)
}
if o.Config.EnableStorage {
go o.runLeaderElection("arango-storage-operator", o.onStartStorage)
go o.runLeaderElection("arango-storage-operator", o.onStartStorage, o.Dependencies.StorageProbe)
}
// Wait until process terminates
<-context.TODO().Done()
Expand Down
71 changes: 68 additions & 3 deletions pkg/operator/operator_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,37 @@
package operator

import (
"fmt"
"os"
"time"

"github.com/rs/zerolog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/probe"
)

func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{})) {
// runLeaderElection performs a leader election on a lock with given name in
// the namespace that the operator is deployed in.
// When the leader election is won, the given callback is called.
// When the leader election is was won once, but then the leadership is lost, the process is killed.
// The given ready probe is set, as soon as this process became the leader, or a new leader
// is detected.
func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{}), readyProbe *probe.ReadyProbe) {
namespace := o.Config.Namespace
kubecli := o.Dependencies.KubeCli
log := o.log.With().Str("lock-name", lockName).Logger()
eventTarget := o.getLeaderElectionEventTarget(log)
recordEvent := func(reason, message string) {
if eventTarget != nil {
o.Dependencies.EventRecorder.Event(eventTarget, v1.EventTypeNormal, reason, message)
}
}
rl, err := resourcelock.New(resourcelock.EndpointsResourceLock,
namespace,
lockName,
Expand All @@ -51,10 +72,54 @@ func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan s
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: onStart,
OnStartedLeading: func(stop <-chan struct{}) {
recordEvent("Leader Election Won", fmt.Sprintf("Pod %s is running as leader", o.Config.PodName))
readyProbe.SetReady()
onStart(stop)
},
OnStoppedLeading: func() {
log.Info().Msg("Leader election lost")
recordEvent("Stop Leading", fmt.Sprintf("Pod %s is stopping to run as leader", o.Config.PodName))
log.Info().Msg("Stop leading. Terminating process")
os.Exit(1)
},
OnNewLeader: func(identity string) {
log.Info().Str("identity", identity).Msg("New leader detected")
readyProbe.SetReady()
},
},
})
}

// getLeaderElectionEventTarget returns the object that leader election related
// events will be added to.
func (o *Operator) getLeaderElectionEventTarget(log zerolog.Logger) runtime.Object {
ns := o.Config.Namespace
kubecli := o.Dependencies.KubeCli
pods := kubecli.CoreV1().Pods(ns)
log = log.With().Str("pod-name", o.Config.PodName).Logger()
pod, err := pods.Get(o.Config.PodName, metav1.GetOptions{})
if err != nil {
log.Error().Err(err).Msg("Cannot find Pod containing this operator")
return nil
}
rSet, err := k8sutil.GetPodOwner(kubecli, pod, ns)
if err != nil {
log.Error().Err(err).Msg("Cannot find ReplicaSet owning the Pod containing this operator")
return pod
}
if rSet == nil {
log.Error().Msg("Pod containing this operator has no ReplicaSet owner")
return pod
}
log = log.With().Str("replicaSet-name", rSet.Name).Logger()
depl, err := k8sutil.GetReplicaSetOwner(kubecli, rSet, ns)
if err != nil {
log.Error().Err(err).Msg("Cannot find Deployment owning the ReplicataSet that owns the Pod containing this operator")
return rSet
}
if rSet == nil {
log.Error().Msg("ReplicaSet that owns the Pod containing this operator has no Deployment owner")
return rSet
}
return depl
}
62 changes: 62 additions & 0 deletions pkg/util/k8sutil/owner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package k8sutil

import (
"k8s.io/api/apps/v1beta2"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// GetPodOwner returns the ReplicaSet that owns the given Pod.
// If the Pod has no owner of the owner is not a ReplicaSet, nil is returned.
func GetPodOwner(kubecli kubernetes.Interface, pod *v1.Pod, ns string) (*v1beta2.ReplicaSet, error) {
for _, ref := range pod.GetOwnerReferences() {
if ref.Kind == "ReplicaSet" {
rSets := kubecli.AppsV1beta2().ReplicaSets(pod.GetNamespace())
rSet, err := rSets.Get(ref.Name, metav1.GetOptions{})
if err != nil {
return nil, maskAny(err)
}
return rSet, nil
}
}
return nil, nil
}

// GetReplicaSetOwner returns the Deployment that owns the given ReplicaSet.
// If the ReplicaSet has no owner of the owner is not a Deployment, nil is returned.
func GetReplicaSetOwner(kubecli kubernetes.Interface, rSet *v1beta2.ReplicaSet, ns string) (*v1beta2.Deployment, error) {
for _, ref := range rSet.GetOwnerReferences() {
if ref.Kind == "Deployment" {
depls := kubecli.AppsV1beta2().Deployments(rSet.GetNamespace())
depl, err := depls.Get(ref.Name, metav1.GetOptions{})
if err != nil {
return nil, maskAny(err)
}
return depl, nil
}
}
return nil, nil
}

0 comments on commit cf338bd

Please sign in to comment.