Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control pod routing in service objects with label #181

Merged
merged 9 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ E2E_TEST_DIRS?=tests/e2e
else
E2E_TEST_DIRS?=tests/e2e tests/e2e-extra
endif
# Additional arguments to pass to 'kubectl kuttl'
E2E_ADDITIONAL_ARGS?=

# Specify how to deploy the operator. Allowable values are 'helm', 'olm' or 'random'.
# When deploying with olm, it is expected that `make setup-olm` has been run
Expand Down Expand Up @@ -246,7 +248,7 @@ run-int-tests: install-kuttl-plugin vdb-gen setup-e2e-communal ## Run the integr
ifeq ($(DEPLOY_WITH), $(filter $(DEPLOY_WITH), olm random))
$(MAKE) setup-olm
endif
kubectl kuttl test --report xml --artifacts-dir ${LOGDIR} --parallel $(E2E_PARALLELISM) $(E2E_TEST_DIRS)
kubectl kuttl test --report xml --artifacts-dir ${LOGDIR} --parallel $(E2E_PARALLELISM) $(E2E_ADDITIONAL_ARGS) $(E2E_TEST_DIRS)

.PHONY: run-online-upgrade-tests
run-online-upgrade-tests: install-kuttl-plugin setup-e2e-communal ## Run integration tests that only work on Vertica 11.1+ server
Expand All @@ -256,7 +258,7 @@ endif
ifeq ($(BASE_VERTICA_IMG), <not-set>)
$(error $$BASE_VERTICA_IMG not set)
endif
kubectl kuttl test --report xml --artifacts-dir ${LOGDIR} --parallel $(E2E_PARALLELISM) tests/e2e-online-upgrade/
kubectl kuttl test --report xml --artifacts-dir ${LOGDIR} --parallel $(E2E_PARALLELISM) $(E2E_ADDITIONAL_ARGS) tests/e2e-online-upgrade/

.PHONY: run-soak-tests
run-soak-tests: install-kuttl-plugin kuttl-step-gen ## Run the soak tests
Expand Down
10 changes: 10 additions & 0 deletions api/v1beta1/verticadb_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,16 @@ func MakeVDBName() types.NamespacedName {
return types.NamespacedName{Name: "vertica-sample", Namespace: "default"}
}

// FindTransientSubcluster will return a pointer to the transient subcluster if one exists
func (v *VerticaDB) FindTransientSubcluster() *Subcluster {
for i := range v.Spec.Subclusters {
if v.Spec.Subclusters[i].IsTransient {
return &v.Spec.Subclusters[i]
}
}
return nil
}

// MakeVDB is a helper that constructs a fully formed VerticaDB struct using the sample name.
// This is intended for test purposes.
func MakeVDB() *VerticaDB {
Expand Down
24 changes: 23 additions & 1 deletion api/v1beta1/verticadb_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (v *VerticaDB) validateVerticaDBSpec() field.ErrorList {
allErrs = v.hasValidKerberosSetup(allErrs)
allErrs = v.hasValidTemporarySubclusterRouting(allErrs)
allErrs = v.matchingServiceNamesAreConsistent(allErrs)
allErrs = v.transientSubclusterMustMatchTemplate(allErrs)
if len(allErrs) == 0 {
return nil
}
Expand Down Expand Up @@ -617,7 +618,7 @@ func (v *VerticaDB) hasValidTemporarySubclusterRouting(allErrs field.ErrorList)
"size of subcluster template must be greater than zero")
allErrs = append(allErrs, err)
}
if _, ok := scMap[v.Spec.TemporarySubclusterRouting.Template.Name]; ok {
if sc, ok := scMap[v.Spec.TemporarySubclusterRouting.Template.Name]; ok && !sc.IsTransient {
err := field.Invalid(templateFieldPrefix.Child("name"),
v.Spec.TemporarySubclusterRouting.Template.Name,
"cannot choose a name of an existing subcluster")
Expand Down Expand Up @@ -700,6 +701,27 @@ func (v *VerticaDB) matchingServiceNamesAreConsistent(allErrs field.ErrorList) f
return allErrs
}

// transientSubclusterMustMatchTemplate is a check to make sure the IsTransient
// isn't being set for subcluster. It must only be used for the temporary
// subcluster template.
func (v *VerticaDB) transientSubclusterMustMatchTemplate(allErrs field.ErrorList) field.ErrorList {
for i := range v.Spec.Subclusters {
sc := &v.Spec.Subclusters[i]
if !sc.IsTransient {
continue
}

fieldPrefix := field.NewPath("spec").Child("subclusters").Index(i)
if sc.Name != v.Spec.TemporarySubclusterRouting.Template.Name {
err := field.Invalid(fieldPrefix.Child("Name").Index(i),
sc.Name,
"Transient subcluster name doesn't match template")
allErrs = append(allErrs, err)
}
}
return allErrs
}

func (v *VerticaDB) isImageChangeInProgress() bool {
return len(v.Status.Conditions) > ImageChangeInProgressIndex &&
v.Status.Conditions[ImageChangeInProgressIndex].Status == v1.ConditionTrue
Expand Down
13 changes: 13 additions & 0 deletions api/v1beta1/verticadb_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,19 @@ var _ = Describe("verticadb_webhook", func() {
validateSpecValuesHaveErr(vdb, false)
})

It("prevent transient subcluster having a different name then the template", func() {
vdb := createVDBHelper()
vdb.Spec.Subclusters = []Subcluster{
{Name: "sc1", Size: 1, IsPrimary: true},
{Name: "sc2", Size: 1, IsPrimary: false, IsTransient: true},
}
vdb.Spec.TemporarySubclusterRouting.Template = Subcluster{
Name: "transient",
Size: 1,
IsPrimary: false,
}
validateSpecValuesHaveErr(vdb, true)
})
})

func createVDBHelper() *VerticaDB {
Expand Down
2 changes: 1 addition & 1 deletion pkg/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func BuildStsSpec(nm types.NamespacedName, vdb *vapi.VerticaDB, sc *vapi.Subclus
},
Spec: appsv1.StatefulSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: MakeSvcSelectorLabelsForSubclusterNameRouting(vdb, sc),
MatchLabels: MakeStsSelectorLabels(vdb, sc),
},
ServiceName: names.GenHlSvcName(vdb).Name,
Replicas: &sc.Size,
Expand Down
33 changes: 29 additions & 4 deletions pkg/builder/labels_annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,22 @@ const (
SubclusterTypeLabel = "vertica.com/subcluster-type"
SubclusterSvcNameLabel = "vertica.com/subcluster-svc"
SubclusterTransientLabel = "vertica.com/subcluster-transient"
VDBInstanceLabel = "app.kubernetes.io/instance"
OperatorVersionLabel = "app.kubernetes.io/version"
OperatorName = "verticadb-operator" // The name of the operator

// ClientRoutingLabel is a label that must exist on the pod in
// order for Service objects to route to the pod. This label isn't part of
// the template in the StatefulSet. This label is added after the pod is
// scheduled. There are a couple of uses for it:
// - after an add node, we only add the labels once the node has at least
// one shard subscription. This saves routing to a pod that cannot fulfill
// a query request.
// - before we remove a node. It allows us to drain out pods that are going
// to be removed by a pending node removal.
ClientRoutingLabel = "vertica.com/client-routing"
ClientRoutingVal = "true"

VDBInstanceLabel = "app.kubernetes.io/instance"
OperatorVersionLabel = "app.kubernetes.io/version"
OperatorName = "verticadb-operator" // The name of the operator

CurOperatorVersion = "1.3.1" // The version number of the operator
OperatorVersion100 = "1.0.0"
Expand Down Expand Up @@ -132,14 +145,26 @@ func MakeBaseSvcSelectorLabels(vdb *vapi.VerticaDB) map[string]string {
func MakeSvcSelectorLabelsForServiceNameRouting(vdb *vapi.VerticaDB, sc *vapi.Subcluster) map[string]string {
m := MakeBaseSvcSelectorLabels(vdb)
m[SubclusterSvcNameLabel] = sc.GetServiceName()
// Only route to nodes that have verified they own at least one shard and
// aren't pending delete
m[ClientRoutingLabel] = ClientRoutingVal
return m
}

// MakeSvcSelectorLabelsForSubclusterNameRouting will create the labels for when
// we want a service object to pick the pods based on the subcluster name.
func MakeSvcSelectorLabelsForSubclusterNameRouting(vdb *vapi.VerticaDB, sc *vapi.Subcluster) map[string]string {
m := MakeBaseSvcSelectorLabels(vdb)
// Routing is done solely with the subcluster name.
// Routing is done using the subcluster name rather than the service name.
m[SubclusterNameLabel] = sc.Name
m[ClientRoutingLabel] = ClientRoutingVal

return m
}

// MakeStsSelectorLabels will create the selector labels for use within a StatefulSet
func MakeStsSelectorLabels(vdb *vapi.VerticaDB, sc *vapi.Subcluster) map[string]string {
m := MakeBaseSvcSelectorLabels(vdb)
m[SubclusterNameLabel] = sc.Name
return m
}
153 changes: 153 additions & 0 deletions pkg/controllers/clientroutinglabel_reconcile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
(c) Copyright [2021-2022] Micro Focus or one of its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"
"fmt"

vapi "github.com/vertica/vertica-kubernetes/api/v1beta1"
"github.com/vertica/vertica-kubernetes/pkg/builder"
verrors "github.com/vertica/vertica-kubernetes/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type ApplyMethodType string

const (
AddNodeApplyMethod ApplyMethodType = "Add" // Called after a db_add_node
PodRescheduleApplyMethod ApplyMethodType = "PodReschedule" // Called after pod was rescheduled and vertica restarted
DelNodeApplyMethod ApplyMethodType = "RemoveNode" // Called before a db_remove_node
)

type ClientRoutingLabelReconciler struct {
VRec *VerticaDBReconciler
Vdb *vapi.VerticaDB // Vdb is the CRD we are acting on.
PFacts *PodFacts
ApplyMethod ApplyMethodType
ScName string // Subcluster we are going to reconcile. Blank if all subclusters.
}

func MakeClientRoutingLabelReconciler(vdbrecon *VerticaDBReconciler,
vdb *vapi.VerticaDB, pfacts *PodFacts, applyMethod ApplyMethodType, scName string) ReconcileActor {
return &ClientRoutingLabelReconciler{
VRec: vdbrecon,
Vdb: vdb,
PFacts: pfacts,
ApplyMethod: applyMethod,
ScName: scName,
}
}

// Reconcile will add or remove labels that control whether it accepts client
// connections. Pods that have at least one shard owned will have a label added
// so that it receives traffic. For pods that don't own a shard or about to be
// scaled down will have the label removed so that traffic isn't routed to it.
func (c *ClientRoutingLabelReconciler) Reconcile(ctx context.Context, req *ctrl.Request) (ctrl.Result, error) {
c.VRec.Log.Info("Reconcile client routing label", "applyMethod", c.ApplyMethod)

if err := c.PFacts.Collect(ctx, c.Vdb); err != nil {
return ctrl.Result{}, err
}

var savedRes ctrl.Result
for pn, pf := range c.PFacts.Detail {
if c.ScName != "" && pf.subcluster != c.ScName {
continue
}
c.VRec.Log.Info("Considering changing routing label for pod", "name", pf.name)
if res, err := c.reconcilePod(ctx, pn, c.PFacts.Detail[pn]); verrors.IsReconcileAborted(res, err) {
if err == nil {
// If we fail due to a requeue, we will attempt to reconcile other pods before ultimately bailing out.
savedRes = res
continue
}
return res, err
}
}
return savedRes, nil
}

// reconcilePod will handle checking for the label of a single pod
func (c *ClientRoutingLabelReconciler) reconcilePod(ctx context.Context, pn types.NamespacedName, pf *PodFact) (ctrl.Result, error) {
var res ctrl.Result
// We retry if case someone else updated the pod since we last fetched it
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
pod := &corev1.Pod{}
if e := c.VRec.Client.Get(ctx, pn, pod); e != nil {
// Not found errors are okay to ignore since there is no pod to
// add/remove a label.
if errors.IsNotFound(e) {
return nil
}
return e
}

patch := client.MergeFrom(pod.DeepCopy())
c.manipulateRoutingLabelInPod(pod, pf)
err := c.VRec.Client.Patch(ctx, pod, patch)
if err != nil {
return err
}

if c.ApplyMethod == AddNodeApplyMethod && pf.upNode && pf.shardSubscriptions == 0 && !pf.pendingDelete {
c.VRec.Log.Info("Will requeue reconciliation because pod does not have any shard subscriptions yet", "name", pf.name)
res.Requeue = true
}
return nil
})
return res, err
}

func (c *ClientRoutingLabelReconciler) manipulateRoutingLabelInPod(pod *corev1.Pod, pf *PodFact) {
_, labelExists := pod.Labels[builder.ClientRoutingLabel]

// There are 4 cases this reconciler is used:
// 1) Called after add node
// 2) Called after pod reschedule + restart
// 3) Called before remove node
// 4) Called before removal of a subcluster
//
// For 1) and 2), we are going to add labels to qualify pods. For 2),
// we will reschedule as this reconciler is usually paired with a
// rebalance_shards() call.
//
// For 3), we are going to remove labels so that client connections
// stopped getting routed there. This only applies to pods that are
// pending delete.
//
// For 4), like 3) we are going to remove labels. This applies to the
// entire subcluster, so pending delete isn't checked.
switch c.ApplyMethod {
case AddNodeApplyMethod, PodRescheduleApplyMethod:
if !labelExists && pf.upNode && pf.shardSubscriptions > 0 && !pf.pendingDelete {
pod.Labels[builder.ClientRoutingLabel] = builder.ClientRoutingVal
c.VRec.Log.Info("Adding client routing label", "pod",
pod.Name, "label", fmt.Sprintf("%s=%s", builder.ClientRoutingLabel, builder.ClientRoutingVal))
}
case DelNodeApplyMethod:
if labelExists && pf.pendingDelete {
delete(pod.Labels, builder.ClientRoutingLabel)
c.VRec.Log.Info("Removing client routing label", "pod",
pod.Name, "label", fmt.Sprintf("%s=%s", builder.ClientRoutingLabel, builder.ClientRoutingVal))
}
}
}
Loading