From dd1fc1cfac0cad27785b3920a589a9048edb1f1b Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Mon, 25 May 2020 21:31:44 +0800 Subject: [PATCH] tidb-scheduler: implement kube-scheduler extender preempt verb (#2510) * Implement kube-scheduler extender preempt verb * fix preemption bug in types Co-authored-by: DanielZhangQD <36026334+DanielZhangQD@users.noreply.github.com> --- .../config/_scheduler-policy-json.tpl | 1 + hack/local-up-operator.sh | 6 +- pkg/scheduler/predicates/fake.go | 44 ++++ pkg/scheduler/scheduler.go | 106 ++++++++ pkg/scheduler/scheduler_test.go | 245 +++++++++++++++--- pkg/scheduler/server/mux.go | 28 ++ 6 files changed, 391 insertions(+), 39 deletions(-) create mode 100644 pkg/scheduler/predicates/fake.go diff --git a/charts/tidb-operator/templates/config/_scheduler-policy-json.tpl b/charts/tidb-operator/templates/config/_scheduler-policy-json.tpl index 824c1e67b5d..af577369dd5 100644 --- a/charts/tidb-operator/templates/config/_scheduler-policy-json.tpl +++ b/charts/tidb-operator/templates/config/_scheduler-policy-json.tpl @@ -34,6 +34,7 @@ { "urlPrefix": "http://127.0.0.1:10262/scheduler", "filterVerb": "filter", + "preemptVerb": "preempt", "weight": 1, "httpTimeout": 30000000000, "enableHttps": false diff --git a/hack/local-up-operator.sh b/hack/local-up-operator.sh index 2d8bdac03a1..51be3832da9 100755 --- a/hack/local-up-operator.sh +++ b/hack/local-up-operator.sh @@ -169,11 +169,15 @@ echo "info: installing crds" $KUBECTL_BIN apply -f manifests/crd.yaml echo "info: deploying tidb-operator" +KUBE_VERSION=$($KUBECTL_BIN version --short | awk '/Server Version:/ {print $3}') helm_args=( template + --kube-version "$KUBE_VERSION" --name tidb-operator-dev --namespace "$NAMESPACE" - --set operatorImage=$DOCKER_REGISTRY/pingcap/tidb-operator:${IMAGE_TAG} + --set-string operatorImage=$DOCKER_REGISTRY/pingcap/tidb-operator:${IMAGE_TAG} + --set-string controllerManager.logLevel=4 + --set-string scheduler.logLevel=4 ) $HELM_BIN ${helm_args[@]} ./charts/tidb-operator/ | kubectl -n "$NAMESPACE" apply -f - diff --git a/pkg/scheduler/predicates/fake.go b/pkg/scheduler/predicates/fake.go new file mode 100644 index 00000000000..11b53fc7c1b --- /dev/null +++ b/pkg/scheduler/predicates/fake.go @@ -0,0 +1,44 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package predicates + +import ( + v1 "k8s.io/api/core/v1" +) + +type FakePredicate struct { + FakeName string + Nodes []v1.Node + Err error +} + +var _ Predicate = &FakePredicate{} + +func NewFakePredicate(name string, nodes []v1.Node, err error) *FakePredicate { + return &FakePredicate{} +} + +func (f *FakePredicate) Name() string { + return f.FakeName +} + +func (f *FakePredicate) Filter(_ string, _ *v1.Pod, nodes []v1.Node) ([]v1.Node, error) { + if f.Err != nil { + return nil, f.Err + } + if f.Nodes != nil { + return f.Nodes, nil + } + return nodes, nil +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 6a5f35a33a9..65aaaa38b41 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -15,17 +15,20 @@ package scheduler import ( "fmt" + "strings" "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" "github.com/pingcap/tidb-operator/pkg/features" "github.com/pingcap/tidb-operator/pkg/label" "github.com/pingcap/tidb-operator/pkg/scheduler/predicates" apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" kubescheme "k8s.io/client-go/kubernetes/scheme" eventv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" "k8s.io/klog" + schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapiv1 "k8s.io/kubernetes/pkg/scheduler/api/v1" ) @@ -37,6 +40,9 @@ type Scheduler interface { // expected to be a subset of the supplied list. Filter(*schedulerapiv1.ExtenderArgs) (*schedulerapiv1.ExtenderFilterResult, error) + // Preempt implements scheduler extender preempt verb. + Preempt(args *schedulerapi.ExtenderPreemptionArgs) (*schedulerapi.ExtenderPreemptionResult, error) + // Prioritize based on extender-implemented priority functions. The returned scores & weight // are used to compute the weighted score for an extender. The weighted scores are added to // the scores computed by kubernetes scheduler. The total scores are used to do the host selection. @@ -47,6 +53,7 @@ type scheduler struct { // component => predicates predicates map[string][]predicates.Predicate + kubeCli kubernetes.Interface recorder record.EventRecorder } @@ -72,6 +79,7 @@ func NewScheduler(kubeCli kubernetes.Interface, cli versioned.Interface) Schedul } return &scheduler{ predicates: predicatesByComponent, + kubeCli: kubeCli, recorder: recorder, } } @@ -134,6 +142,104 @@ func (s *scheduler) Filter(args *schedulerapiv1.ExtenderArgs) (*schedulerapiv1.E }, nil } +// convertToNodeNameToMetaVictims converts from struct type to meta types. +func convertToNodeNameToMetaVictims( + nodeToVictims map[string]*schedulerapi.Victims, +) map[string]*schedulerapi.MetaVictims { + nodeNameToVictims := map[string]*schedulerapi.MetaVictims{} + for nodeName, victims := range nodeToVictims { + metaVictims := &schedulerapi.MetaVictims{ + Pods: []*schedulerapi.MetaPod{}, + } + for _, pod := range victims.Pods { + metaPod := &schedulerapi.MetaPod{ + UID: string(pod.UID), + } + metaVictims.Pods = append(metaVictims.Pods, metaPod) + } + nodeNameToVictims[nodeName] = metaVictims + } + return nodeNameToVictims +} + +// There is a bug in Kubernetes 1.16.0 and before that the JSON tag in +// v1.ExtenderPreemptionArg is wrong. We must use Kubernetes internal types. +// https://github.com/kubernetes/kubernetes/blob/v1.16.0/pkg/scheduler/api/v1/types.go#L270 +// TODO use `k8s.io/kubernetes/pkg/scheduler/apis/extender/v1` in 1.17 +// TODO use `k8s.io/kube-scheduler/extender/v1` since 1.18 +func (s *scheduler) Preempt(args *schedulerapi.ExtenderPreemptionArgs) (*schedulerapi.ExtenderPreemptionResult, error) { + pod := args.Pod + ns := pod.GetNamespace() + podName := pod.GetName() + + var instanceName string + var exist bool + if instanceName, exist = pod.Labels[label.InstanceLabelKey]; !exist { + klog.Warningf("can't find instanceName in pod labels: %s/%s", ns, podName) + return &schedulerapi.ExtenderPreemptionResult{ + NodeNameToMetaVictims: convertToNodeNameToMetaVictims(args.NodeNameToVictims), + }, nil + } + + component, ok := pod.Labels[label.ComponentLabelKey] + if !ok { + klog.Warningf("can't find component label in pod labels: %s/%s", ns, podName) + return &schedulerapi.ExtenderPreemptionResult{ + NodeNameToMetaVictims: convertToNodeNameToMetaVictims(args.NodeNameToVictims), + }, nil + } + + predicatesByComponent, ok := s.predicates[component] + if !ok { + klog.Warningf("no predicate for component %q, ignored", component) + return &schedulerapi.ExtenderPreemptionResult{ + NodeNameToMetaVictims: convertToNodeNameToMetaVictims(args.NodeNameToVictims), + }, nil + } + + allNodeNames := []string{} + for nodeName := range args.NodeNameToVictims { + allNodeNames = append(allNodeNames, nodeName) + } + + klog.Infof("preempting for pod %s/%s, potential nodes: %s", ns, podName, strings.Join(allNodeNames, ",")) + + // extender Filter can't report the failed nodes are unresolvable or not, + // see https://github.com/kubernetes/kubernetes/issues/91281 + // we need to filter out nodes in Preempt phase + kubeNodes := make([]apiv1.Node, 0, len(args.NodeNameToVictims)) + for nodeName := range args.NodeNameToVictims { + // optimize this when we have performance issue + node, err := s.kubeCli.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + kubeNodes = append(kubeNodes, *node) + } + var err error + for _, predicate := range predicatesByComponent { + klog.Infof("entering preempt/predicate: %s, nodes: %v", predicate.Name(), predicates.GetNodeNames(kubeNodes)) + kubeNodes, err = predicate.Filter(instanceName, pod, kubeNodes) + klog.Infof("leaving preempt/predicate: %s, nodes: %v", predicate.Name(), predicates.GetNodeNames(kubeNodes)) + if err != nil { + return nil, err + } + } + + feasibleNodeNameToVictims := map[string]*schedulerapi.Victims{} + for _, node := range kubeNodes { + if victims, ok := args.NodeNameToVictims[node.Name]; ok { + feasibleNodeNameToVictims[node.Name] = victims + } else { + return nil, fmt.Errorf("internal error: node %s does not found in args.NodeNameToVictims", node.Name) + } + } + + return &schedulerapi.ExtenderPreemptionResult{ + NodeNameToMetaVictims: convertToNodeNameToMetaVictims(feasibleNodeNameToVictims), + }, nil +} + // FailureError is returned when the FailTiDBSchedulerLabelKey is seen type FailureError struct { PodName string diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 15ef5e5eb16..ad93be7943f 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -15,25 +15,29 @@ package scheduler import ( "fmt" + "strings" "testing" + "github.com/google/go-cmp/cmp" . "github.com/onsi/gomega" "github.com/pingcap/tidb-operator/pkg/label" "github.com/pingcap/tidb-operator/pkg/scheduler/predicates" apiv1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" + schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapiv1 "k8s.io/kubernetes/pkg/scheduler/api/v1" ) func TestSchedulerFilter(t *testing.T) { g := NewGomegaWithT(t) type testcase struct { - name string - args *schedulerapiv1.ExtenderArgs - predicateError bool - expectFn func(*GomegaWithT, *schedulerapiv1.ExtenderFilterResult, error) + name string + args *schedulerapiv1.ExtenderArgs + predicate predicates.Predicate + expectFn func(*GomegaWithT, *schedulerapiv1.ExtenderFilterResult, error) } recorder := record.NewFakeRecorder(10) @@ -44,25 +48,18 @@ func TestSchedulerFilter(t *testing.T) { s := &scheduler{ predicates: map[string][]predicates.Predicate{ label.PDLabelVal: { - newFakeErrPredicate(), + test.predicate, }, label.TiKVLabelVal: { - newFakeErrPredicate(), + test.predicate, }, label.TiDBLabelVal: { - newFakeErrPredicate(), + test.predicate, }, }, recorder: recorder, } - if test.predicateError { - for _, predicatesByComponent := range s.predicates { - for _, predicate := range predicatesByComponent { - predicate.(*fakeErrPredicate).SetError(fmt.Errorf("predicate error")) - } - } - } re, err := s.Filter(test.args) test.expectFn(g, re, err) } @@ -84,7 +81,7 @@ func TestSchedulerFilter(t *testing.T) { Items: []apiv1.Node{}, }, }, - predicateError: false, + predicate: &predicates.FakePredicate{}, expectFn: func(g *GomegaWithT, result *schedulerapiv1.ExtenderFilterResult, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(result.Nodes.ResourceVersion).To(Equal("9999")) @@ -111,7 +108,7 @@ func TestSchedulerFilter(t *testing.T) { Items: []apiv1.Node{}, }, }, - predicateError: false, + predicate: &predicates.FakePredicate{}, expectFn: func(g *GomegaWithT, result *schedulerapiv1.ExtenderFilterResult, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(result.Nodes.ResourceVersion).To(Equal("9999")) @@ -138,7 +135,7 @@ func TestSchedulerFilter(t *testing.T) { Items: []apiv1.Node{}, }, }, - predicateError: true, + predicate: &predicates.FakePredicate{Err: fmt.Errorf("predicate error")}, expectFn: func(g *GomegaWithT, result *schedulerapiv1.ExtenderFilterResult, err error) { g.Expect(err).NotTo(HaveOccurred()) events := predicates.CollectEvents(recorder.Events) @@ -174,7 +171,7 @@ func TestSchedulerFilter(t *testing.T) { }, }, }, - predicateError: false, + predicate: &predicates.FakePredicate{}, expectFn: func(g *GomegaWithT, result *schedulerapiv1.ExtenderFilterResult, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(result.Nodes.Items[0].Name).To(Equal("node-1")) @@ -275,26 +272,198 @@ func TestSchedulerPriority(t *testing.T) { } } -type fakeErrPredicate struct { - err error -} - -func newFakeErrPredicate() *fakeErrPredicate { - return &fakeErrPredicate{} -} - -func (fep *fakeErrPredicate) SetError(err error) { - fep.err = err -} - -func (fep *fakeErrPredicate) Name() string { - return "fakeErrPredicate" -} - -func (fep *fakeErrPredicate) Filter(_ string, _ *apiv1.Pod, nodes []apiv1.Node) ([]apiv1.Node, error) { - if fep.err != nil { - return nil, fep.err +func TestSchedulerPreempt(t *testing.T) { + victims := &schedulerapi.Victims{ + Pods: []*apiv1.Pod{}, + } + metaVictims := &schedulerapi.MetaVictims{ + Pods: []*schedulerapi.MetaPod{}, + } + nodeA := &apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-a", + }, + } + nodeB := &apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-b", + }, + } + nodeC := &apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-c", + }, + } + unrelatedPod := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: apiv1.NamespaceDefault, + Name: "test", + }, + } + pdPod := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: apiv1.NamespaceDefault, + Name: "test", + Labels: map[string]string{ + label.InstanceLabelKey: "test", + label.ComponentLabelKey: "pd", + }, + }, + } + tests := []struct { + name string + nodes []*apiv1.Node + args *schedulerapi.ExtenderPreemptionArgs + predicates map[string][]predicates.Predicate + wantResult *schedulerapi.ExtenderPreemptionResult + wantErr string + }{ + { + name: "unrelated pod", + nodes: []*apiv1.Node{nodeA, nodeB, nodeC}, + args: &schedulerapi.ExtenderPreemptionArgs{ + Pod: unrelatedPod, + NodeNameToVictims: map[string]*schedulerapi.Victims{ + "node-a": victims, + "node-b": victims, + "node-c": victims, + }, + }, + wantResult: &schedulerapi.ExtenderPreemptionResult{ + NodeNameToMetaVictims: map[string]*schedulerapi.MetaVictims{ + "node-a": metaVictims, + "node-b": metaVictims, + "node-c": metaVictims, + }, + }, + }, + { + name: "node does not exist anymore", + nodes: []*apiv1.Node{nodeA, nodeB}, + predicates: map[string][]predicates.Predicate{ + label.PDLabelVal: { + &predicates.FakePredicate{}, + }, + }, + args: &schedulerapi.ExtenderPreemptionArgs{ + Pod: pdPod, + NodeNameToVictims: map[string]*schedulerapi.Victims{ + "node-a": victims, + "node-b": victims, + "node-c": victims, + }, + }, + wantResult: nil, + wantErr: `nodes "node-c" not found`, + }, + { + name: "one of nominated nodes is feasible", + nodes: []*apiv1.Node{nodeA, nodeB, nodeC}, + predicates: map[string][]predicates.Predicate{ + label.PDLabelVal: { + &predicates.FakePredicate{ + Nodes: []apiv1.Node{ + *nodeA, + }, + }, + }, + }, + args: &schedulerapi.ExtenderPreemptionArgs{ + Pod: pdPod, + NodeNameToVictims: map[string]*schedulerapi.Victims{ + "node-a": victims, + "node-b": victims, + "node-c": victims, + }, + }, + wantResult: &schedulerapi.ExtenderPreemptionResult{ + NodeNameToMetaVictims: map[string]*schedulerapi.MetaVictims{ + "node-a": metaVictims, + }, + }, + }, + { + name: "none of nominated nodes is feasible", + nodes: []*apiv1.Node{nodeA, nodeB, nodeC}, + predicates: map[string][]predicates.Predicate{ + label.PDLabelVal: { + &predicates.FakePredicate{ + Nodes: []apiv1.Node{}, + }, + }, + }, + args: &schedulerapi.ExtenderPreemptionArgs{ + Pod: pdPod, + NodeNameToVictims: map[string]*schedulerapi.Victims{ + "node-a": victims, + "node-b": victims, + "node-c": victims, + }, + }, + wantResult: &schedulerapi.ExtenderPreemptionResult{ + NodeNameToMetaVictims: map[string]*schedulerapi.MetaVictims{}, + }, + }, + { + name: "all nominated nodes are feasible", + nodes: []*apiv1.Node{nodeA, nodeB, nodeC}, + predicates: map[string][]predicates.Predicate{ + label.PDLabelVal: { + &predicates.FakePredicate{ + Nodes: []apiv1.Node{ + *nodeA, + *nodeB, + *nodeC, + }, + }, + }, + }, + args: &schedulerapi.ExtenderPreemptionArgs{ + Pod: pdPod, + NodeNameToVictims: map[string]*schedulerapi.Victims{ + "node-a": victims, + "node-b": victims, + "node-c": victims, + }, + }, + wantResult: &schedulerapi.ExtenderPreemptionResult{ + NodeNameToMetaVictims: map[string]*schedulerapi.MetaVictims{ + "node-a": metaVictims, + "node-b": metaVictims, + "node-c": metaVictims, + }, + }, + }, } - return nodes, nil + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + kubeCli := fake.NewSimpleClientset() + s := &scheduler{ + kubeCli: kubeCli, + recorder: record.NewFakeRecorder(10), + } + if tt.predicates != nil { + s.predicates = tt.predicates + } + if tt.nodes != nil { + for _, node := range tt.nodes { + kubeCli.CoreV1().Nodes().Create(node) + } + } + kubeCli.CoreV1().Pods(apiv1.NamespaceDefault).Create(tt.args.Pod) + result, err := s.Preempt(tt.args) + if diff := cmp.Diff(tt.wantResult, result); diff != "" { + t.Errorf("unexpected (-want, +got): %s", diff) + } + if tt.wantErr == "" && err != nil { + t.Errorf("expects error is nil, got %v", err) + } + if tt.wantErr != "" { + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("expects error is %v, got %v", tt.wantErr, err) + } + } + }) + } } diff --git a/pkg/scheduler/server/mux.go b/pkg/scheduler/server/mux.go index 85ab686f8e6..84c0f479ee2 100644 --- a/pkg/scheduler/server/mux.go +++ b/pkg/scheduler/server/mux.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/scheduler" "k8s.io/client-go/kubernetes" "k8s.io/klog" + schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapiv1 "k8s.io/kubernetes/pkg/scheduler/api/v1" ) @@ -52,6 +53,11 @@ func StartServer(kubeCli kubernetes.Interface, cli versioned.Interface, port int Operation("filterNodes"). Writes(schedulerapiv1.ExtenderFilterResult{})) + ws.Route(ws.POST("/preempt").To(svr.preemptNode). + Doc("preempt nodes"). + Operation("preemptNodes"). + Writes(schedulerapi.ExtenderPreemptionResult{})) + ws.Route(ws.POST("/prioritize").To(svr.prioritizeNode). Doc("prioritize nodes"). Operation("prioritizeNodes"). @@ -84,6 +90,28 @@ func (svr *server) filterNode(req *restful.Request, resp *restful.Response) { } } +func (svr *server) preemptNode(req *restful.Request, resp *restful.Response) { + svr.lock.Lock() + defer svr.lock.Unlock() + + args := &schedulerapi.ExtenderPreemptionArgs{} + if err := req.ReadEntity(args); err != nil { + errorResponse(resp, errFailToRead) + return + } + + preemptResult, err := svr.scheduler.Preempt(args) + if err != nil { + errorResponse(resp, restful.NewError(http.StatusInternalServerError, + fmt.Sprintf("unable to preempt nodes: %v", err))) + return + } + + if err := resp.WriteEntity(preemptResult); err != nil { + errorResponse(resp, errFailToWrite) + } +} + func (svr *server) prioritizeNode(req *restful.Request, resp *restful.Response) { args := &schedulerapiv1.ExtenderArgs{} if err := req.ReadEntity(args); err != nil {