Skip to content

Commit

Permalink
tidb-scheduler: implement kube-scheduler extender preempt verb (#2510)
Browse files Browse the repository at this point in the history
* Implement kube-scheduler extender preempt verb

* fix preemption bug in types

Co-authored-by: DanielZhangQD <[email protected]>
  • Loading branch information
cofyc and DanielZhangQD authored May 25, 2020
1 parent e413d49 commit dd1fc1c
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
{
"urlPrefix": "http://127.0.0.1:10262/scheduler",
"filterVerb": "filter",
"preemptVerb": "preempt",
"weight": 1,
"httpTimeout": 30000000000,
"enableHttps": false
Expand Down
6 changes: 5 additions & 1 deletion hack/local-up-operator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,15 @@ echo "info: installing crds"
$KUBECTL_BIN apply -f manifests/crd.yaml

echo "info: deploying tidb-operator"
KUBE_VERSION=$($KUBECTL_BIN version --short | awk '/Server Version:/ {print $3}')
helm_args=(
template
--kube-version "$KUBE_VERSION"
--name tidb-operator-dev
--namespace "$NAMESPACE"
--set operatorImage=$DOCKER_REGISTRY/pingcap/tidb-operator:${IMAGE_TAG}
--set-string operatorImage=$DOCKER_REGISTRY/pingcap/tidb-operator:${IMAGE_TAG}
--set-string controllerManager.logLevel=4
--set-string scheduler.logLevel=4
)

$HELM_BIN ${helm_args[@]} ./charts/tidb-operator/ | kubectl -n "$NAMESPACE" apply -f -
Expand Down
44 changes: 44 additions & 0 deletions pkg/scheduler/predicates/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package predicates

import (
v1 "k8s.io/api/core/v1"
)

type FakePredicate struct {
FakeName string
Nodes []v1.Node
Err error
}

var _ Predicate = &FakePredicate{}

func NewFakePredicate(name string, nodes []v1.Node, err error) *FakePredicate {
return &FakePredicate{}
}

func (f *FakePredicate) Name() string {
return f.FakeName
}

func (f *FakePredicate) Filter(_ string, _ *v1.Pod, nodes []v1.Node) ([]v1.Node, error) {
if f.Err != nil {
return nil, f.Err
}
if f.Nodes != nil {
return f.Nodes, nil
}
return nodes, nil
}
106 changes: 106 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@ package scheduler

import (
"fmt"
"strings"

"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"github.com/pingcap/tidb-operator/pkg/features"
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/pkg/scheduler/predicates"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
kubescheme "k8s.io/client-go/kubernetes/scheme"
eventv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulerapiv1 "k8s.io/kubernetes/pkg/scheduler/api/v1"
)

Expand All @@ -37,6 +40,9 @@ type Scheduler interface {
// expected to be a subset of the supplied list.
Filter(*schedulerapiv1.ExtenderArgs) (*schedulerapiv1.ExtenderFilterResult, error)

// Preempt implements scheduler extender preempt verb.
Preempt(args *schedulerapi.ExtenderPreemptionArgs) (*schedulerapi.ExtenderPreemptionResult, error)

// Prioritize based on extender-implemented priority functions. The returned scores & weight
// are used to compute the weighted score for an extender. The weighted scores are added to
// the scores computed by kubernetes scheduler. The total scores are used to do the host selection.
Expand All @@ -47,6 +53,7 @@ type scheduler struct {
// component => predicates
predicates map[string][]predicates.Predicate

kubeCli kubernetes.Interface
recorder record.EventRecorder
}

Expand All @@ -72,6 +79,7 @@ func NewScheduler(kubeCli kubernetes.Interface, cli versioned.Interface) Schedul
}
return &scheduler{
predicates: predicatesByComponent,
kubeCli: kubeCli,
recorder: recorder,
}
}
Expand Down Expand Up @@ -134,6 +142,104 @@ func (s *scheduler) Filter(args *schedulerapiv1.ExtenderArgs) (*schedulerapiv1.E
}, nil
}

// convertToNodeNameToMetaVictims converts from struct type to meta types.
func convertToNodeNameToMetaVictims(
nodeToVictims map[string]*schedulerapi.Victims,
) map[string]*schedulerapi.MetaVictims {
nodeNameToVictims := map[string]*schedulerapi.MetaVictims{}
for nodeName, victims := range nodeToVictims {
metaVictims := &schedulerapi.MetaVictims{
Pods: []*schedulerapi.MetaPod{},
}
for _, pod := range victims.Pods {
metaPod := &schedulerapi.MetaPod{
UID: string(pod.UID),
}
metaVictims.Pods = append(metaVictims.Pods, metaPod)
}
nodeNameToVictims[nodeName] = metaVictims
}
return nodeNameToVictims
}

// There is a bug in Kubernetes 1.16.0 and before that the JSON tag in
// v1.ExtenderPreemptionArg is wrong. We must use Kubernetes internal types.
// https://github.com/kubernetes/kubernetes/blob/v1.16.0/pkg/scheduler/api/v1/types.go#L270
// TODO use `k8s.io/kubernetes/pkg/scheduler/apis/extender/v1` in 1.17
// TODO use `k8s.io/kube-scheduler/extender/v1` since 1.18
func (s *scheduler) Preempt(args *schedulerapi.ExtenderPreemptionArgs) (*schedulerapi.ExtenderPreemptionResult, error) {
pod := args.Pod
ns := pod.GetNamespace()
podName := pod.GetName()

var instanceName string
var exist bool
if instanceName, exist = pod.Labels[label.InstanceLabelKey]; !exist {
klog.Warningf("can't find instanceName in pod labels: %s/%s", ns, podName)
return &schedulerapi.ExtenderPreemptionResult{
NodeNameToMetaVictims: convertToNodeNameToMetaVictims(args.NodeNameToVictims),
}, nil
}

component, ok := pod.Labels[label.ComponentLabelKey]
if !ok {
klog.Warningf("can't find component label in pod labels: %s/%s", ns, podName)
return &schedulerapi.ExtenderPreemptionResult{
NodeNameToMetaVictims: convertToNodeNameToMetaVictims(args.NodeNameToVictims),
}, nil
}

predicatesByComponent, ok := s.predicates[component]
if !ok {
klog.Warningf("no predicate for component %q, ignored", component)
return &schedulerapi.ExtenderPreemptionResult{
NodeNameToMetaVictims: convertToNodeNameToMetaVictims(args.NodeNameToVictims),
}, nil
}

allNodeNames := []string{}
for nodeName := range args.NodeNameToVictims {
allNodeNames = append(allNodeNames, nodeName)
}

klog.Infof("preempting for pod %s/%s, potential nodes: %s", ns, podName, strings.Join(allNodeNames, ","))

// extender Filter can't report the failed nodes are unresolvable or not,
// see https://github.com/kubernetes/kubernetes/issues/91281
// we need to filter out nodes in Preempt phase
kubeNodes := make([]apiv1.Node, 0, len(args.NodeNameToVictims))
for nodeName := range args.NodeNameToVictims {
// optimize this when we have performance issue
node, err := s.kubeCli.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
return nil, err
}
kubeNodes = append(kubeNodes, *node)
}
var err error
for _, predicate := range predicatesByComponent {
klog.Infof("entering preempt/predicate: %s, nodes: %v", predicate.Name(), predicates.GetNodeNames(kubeNodes))
kubeNodes, err = predicate.Filter(instanceName, pod, kubeNodes)
klog.Infof("leaving preempt/predicate: %s, nodes: %v", predicate.Name(), predicates.GetNodeNames(kubeNodes))
if err != nil {
return nil, err
}
}

feasibleNodeNameToVictims := map[string]*schedulerapi.Victims{}
for _, node := range kubeNodes {
if victims, ok := args.NodeNameToVictims[node.Name]; ok {
feasibleNodeNameToVictims[node.Name] = victims
} else {
return nil, fmt.Errorf("internal error: node %s does not found in args.NodeNameToVictims", node.Name)
}
}

return &schedulerapi.ExtenderPreemptionResult{
NodeNameToMetaVictims: convertToNodeNameToMetaVictims(feasibleNodeNameToVictims),
}, nil
}

// FailureError is returned when the FailTiDBSchedulerLabelKey is seen
type FailureError struct {
PodName string
Expand Down
Loading

0 comments on commit dd1fc1c

Please sign in to comment.