diff --git a/Makefile b/Makefile index 2bdc7f073..64df8920d 100644 --- a/Makefile +++ b/Makefile @@ -153,6 +153,8 @@ E2E_TEST_DIRS?=tests/e2e else E2E_TEST_DIRS?=tests/e2e tests/e2e-extra endif +# Additional arguments to pass to 'kubectl kuttl' +E2E_ADDITIONAL_ARGS?= # Specify how to deploy the operator. Allowable values are 'helm', 'olm' or 'random'. # When deploying with olm, it is expected that `make setup-olm` has been run @@ -246,7 +248,7 @@ run-int-tests: install-kuttl-plugin vdb-gen setup-e2e-communal ## Run the integr ifeq ($(DEPLOY_WITH), $(filter $(DEPLOY_WITH), olm random)) $(MAKE) setup-olm endif - kubectl kuttl test --report xml --artifacts-dir ${LOGDIR} --parallel $(E2E_PARALLELISM) $(E2E_TEST_DIRS) + kubectl kuttl test --report xml --artifacts-dir ${LOGDIR} --parallel $(E2E_PARALLELISM) $(E2E_ADDITIONAL_ARGS) $(E2E_TEST_DIRS) .PHONY: run-online-upgrade-tests run-online-upgrade-tests: install-kuttl-plugin setup-e2e-communal ## Run integration tests that only work on Vertica 11.1+ server @@ -256,7 +258,7 @@ endif ifeq ($(BASE_VERTICA_IMG), ) $(error $$BASE_VERTICA_IMG not set) endif - kubectl kuttl test --report xml --artifacts-dir ${LOGDIR} --parallel $(E2E_PARALLELISM) tests/e2e-online-upgrade/ + kubectl kuttl test --report xml --artifacts-dir ${LOGDIR} --parallel $(E2E_PARALLELISM) $(E2E_ADDITIONAL_ARGS) tests/e2e-online-upgrade/ .PHONY: run-soak-tests run-soak-tests: install-kuttl-plugin kuttl-step-gen ## Run the soak tests diff --git a/api/v1beta1/verticadb_types.go b/api/v1beta1/verticadb_types.go index b97f6fd72..a636e2ba6 100644 --- a/api/v1beta1/verticadb_types.go +++ b/api/v1beta1/verticadb_types.go @@ -839,6 +839,16 @@ func MakeVDBName() types.NamespacedName { return types.NamespacedName{Name: "vertica-sample", Namespace: "default"} } +// FindTransientSubcluster will return a pointer to the transient subcluster if one exists +func (v *VerticaDB) FindTransientSubcluster() *Subcluster { + for i := range v.Spec.Subclusters { + if v.Spec.Subclusters[i].IsTransient { + return &v.Spec.Subclusters[i] + } + } + return nil +} + // MakeVDB is a helper that constructs a fully formed VerticaDB struct using the sample name. // This is intended for test purposes. func MakeVDB() *VerticaDB { diff --git a/api/v1beta1/verticadb_webhook.go b/api/v1beta1/verticadb_webhook.go index 2c177a7aa..ace55b8cc 100644 --- a/api/v1beta1/verticadb_webhook.go +++ b/api/v1beta1/verticadb_webhook.go @@ -268,6 +268,7 @@ func (v *VerticaDB) validateVerticaDBSpec() field.ErrorList { allErrs = v.hasValidKerberosSetup(allErrs) allErrs = v.hasValidTemporarySubclusterRouting(allErrs) allErrs = v.matchingServiceNamesAreConsistent(allErrs) + allErrs = v.transientSubclusterMustMatchTemplate(allErrs) if len(allErrs) == 0 { return nil } @@ -617,7 +618,7 @@ func (v *VerticaDB) hasValidTemporarySubclusterRouting(allErrs field.ErrorList) "size of subcluster template must be greater than zero") allErrs = append(allErrs, err) } - if _, ok := scMap[v.Spec.TemporarySubclusterRouting.Template.Name]; ok { + if sc, ok := scMap[v.Spec.TemporarySubclusterRouting.Template.Name]; ok && !sc.IsTransient { err := field.Invalid(templateFieldPrefix.Child("name"), v.Spec.TemporarySubclusterRouting.Template.Name, "cannot choose a name of an existing subcluster") @@ -700,6 +701,27 @@ func (v *VerticaDB) matchingServiceNamesAreConsistent(allErrs field.ErrorList) f return allErrs } +// transientSubclusterMustMatchTemplate is a check to make sure the IsTransient +// isn't being set for subcluster. It must only be used for the temporary +// subcluster template. +func (v *VerticaDB) transientSubclusterMustMatchTemplate(allErrs field.ErrorList) field.ErrorList { + for i := range v.Spec.Subclusters { + sc := &v.Spec.Subclusters[i] + if !sc.IsTransient { + continue + } + + fieldPrefix := field.NewPath("spec").Child("subclusters").Index(i) + if sc.Name != v.Spec.TemporarySubclusterRouting.Template.Name { + err := field.Invalid(fieldPrefix.Child("Name").Index(i), + sc.Name, + "Transient subcluster name doesn't match template") + allErrs = append(allErrs, err) + } + } + return allErrs +} + func (v *VerticaDB) isImageChangeInProgress() bool { return len(v.Status.Conditions) > ImageChangeInProgressIndex && v.Status.Conditions[ImageChangeInProgressIndex].Status == v1.ConditionTrue diff --git a/api/v1beta1/verticadb_webhook_test.go b/api/v1beta1/verticadb_webhook_test.go index 718ca38da..df3db6c57 100644 --- a/api/v1beta1/verticadb_webhook_test.go +++ b/api/v1beta1/verticadb_webhook_test.go @@ -455,6 +455,19 @@ var _ = Describe("verticadb_webhook", func() { validateSpecValuesHaveErr(vdb, false) }) + It("prevent transient subcluster having a different name then the template", func() { + vdb := createVDBHelper() + vdb.Spec.Subclusters = []Subcluster{ + {Name: "sc1", Size: 1, IsPrimary: true}, + {Name: "sc2", Size: 1, IsPrimary: false, IsTransient: true}, + } + vdb.Spec.TemporarySubclusterRouting.Template = Subcluster{ + Name: "transient", + Size: 1, + IsPrimary: false, + } + validateSpecValuesHaveErr(vdb, true) + }) }) func createVDBHelper() *VerticaDB { diff --git a/pkg/builder/builder.go b/pkg/builder/builder.go index 07173056f..1323790da 100644 --- a/pkg/builder/builder.go +++ b/pkg/builder/builder.go @@ -443,7 +443,7 @@ func BuildStsSpec(nm types.NamespacedName, vdb *vapi.VerticaDB, sc *vapi.Subclus }, Spec: appsv1.StatefulSetSpec{ Selector: &metav1.LabelSelector{ - MatchLabels: MakeSvcSelectorLabelsForSubclusterNameRouting(vdb, sc), + MatchLabels: MakeStsSelectorLabels(vdb, sc), }, ServiceName: names.GenHlSvcName(vdb).Name, Replicas: &sc.Size, diff --git a/pkg/builder/labels_annotations.go b/pkg/builder/labels_annotations.go index 89165f6fb..a856929eb 100644 --- a/pkg/builder/labels_annotations.go +++ b/pkg/builder/labels_annotations.go @@ -28,9 +28,22 @@ const ( SubclusterTypeLabel = "vertica.com/subcluster-type" SubclusterSvcNameLabel = "vertica.com/subcluster-svc" SubclusterTransientLabel = "vertica.com/subcluster-transient" - VDBInstanceLabel = "app.kubernetes.io/instance" - OperatorVersionLabel = "app.kubernetes.io/version" - OperatorName = "verticadb-operator" // The name of the operator + + // ClientRoutingLabel is a label that must exist on the pod in + // order for Service objects to route to the pod. This label isn't part of + // the template in the StatefulSet. This label is added after the pod is + // scheduled. There are a couple of uses for it: + // - after an add node, we only add the labels once the node has at least + // one shard subscription. This saves routing to a pod that cannot fulfill + // a query request. + // - before we remove a node. It allows us to drain out pods that are going + // to be removed by a pending node removal. + ClientRoutingLabel = "vertica.com/client-routing" + ClientRoutingVal = "true" + + VDBInstanceLabel = "app.kubernetes.io/instance" + OperatorVersionLabel = "app.kubernetes.io/version" + OperatorName = "verticadb-operator" // The name of the operator CurOperatorVersion = "1.3.1" // The version number of the operator OperatorVersion100 = "1.0.0" @@ -132,6 +145,9 @@ func MakeBaseSvcSelectorLabels(vdb *vapi.VerticaDB) map[string]string { func MakeSvcSelectorLabelsForServiceNameRouting(vdb *vapi.VerticaDB, sc *vapi.Subcluster) map[string]string { m := MakeBaseSvcSelectorLabels(vdb) m[SubclusterSvcNameLabel] = sc.GetServiceName() + // Only route to nodes that have verified they own at least one shard and + // aren't pending delete + m[ClientRoutingLabel] = ClientRoutingVal return m } @@ -139,7 +155,16 @@ func MakeSvcSelectorLabelsForServiceNameRouting(vdb *vapi.VerticaDB, sc *vapi.Su // we want a service object to pick the pods based on the subcluster name. func MakeSvcSelectorLabelsForSubclusterNameRouting(vdb *vapi.VerticaDB, sc *vapi.Subcluster) map[string]string { m := MakeBaseSvcSelectorLabels(vdb) - // Routing is done solely with the subcluster name. + // Routing is done using the subcluster name rather than the service name. + m[SubclusterNameLabel] = sc.Name + m[ClientRoutingLabel] = ClientRoutingVal + + return m +} + +// MakeStsSelectorLabels will create the selector labels for use within a StatefulSet +func MakeStsSelectorLabels(vdb *vapi.VerticaDB, sc *vapi.Subcluster) map[string]string { + m := MakeBaseSvcSelectorLabels(vdb) m[SubclusterNameLabel] = sc.Name return m } diff --git a/pkg/controllers/clientroutinglabel_reconcile.go b/pkg/controllers/clientroutinglabel_reconcile.go new file mode 100644 index 000000000..c6045f6e4 --- /dev/null +++ b/pkg/controllers/clientroutinglabel_reconcile.go @@ -0,0 +1,153 @@ +/* + (c) Copyright [2021-2022] Micro Focus or one of its affiliates. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + + vapi "github.com/vertica/vertica-kubernetes/api/v1beta1" + "github.com/vertica/vertica-kubernetes/pkg/builder" + verrors "github.com/vertica/vertica-kubernetes/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ApplyMethodType string + +const ( + AddNodeApplyMethod ApplyMethodType = "Add" // Called after a db_add_node + PodRescheduleApplyMethod ApplyMethodType = "PodReschedule" // Called after pod was rescheduled and vertica restarted + DelNodeApplyMethod ApplyMethodType = "RemoveNode" // Called before a db_remove_node +) + +type ClientRoutingLabelReconciler struct { + VRec *VerticaDBReconciler + Vdb *vapi.VerticaDB // Vdb is the CRD we are acting on. + PFacts *PodFacts + ApplyMethod ApplyMethodType + ScName string // Subcluster we are going to reconcile. Blank if all subclusters. +} + +func MakeClientRoutingLabelReconciler(vdbrecon *VerticaDBReconciler, + vdb *vapi.VerticaDB, pfacts *PodFacts, applyMethod ApplyMethodType, scName string) ReconcileActor { + return &ClientRoutingLabelReconciler{ + VRec: vdbrecon, + Vdb: vdb, + PFacts: pfacts, + ApplyMethod: applyMethod, + ScName: scName, + } +} + +// Reconcile will add or remove labels that control whether it accepts client +// connections. Pods that have at least one shard owned will have a label added +// so that it receives traffic. For pods that don't own a shard or about to be +// scaled down will have the label removed so that traffic isn't routed to it. +func (c *ClientRoutingLabelReconciler) Reconcile(ctx context.Context, req *ctrl.Request) (ctrl.Result, error) { + c.VRec.Log.Info("Reconcile client routing label", "applyMethod", c.ApplyMethod) + + if err := c.PFacts.Collect(ctx, c.Vdb); err != nil { + return ctrl.Result{}, err + } + + var savedRes ctrl.Result + for pn, pf := range c.PFacts.Detail { + if c.ScName != "" && pf.subcluster != c.ScName { + continue + } + c.VRec.Log.Info("Considering changing routing label for pod", "name", pf.name) + if res, err := c.reconcilePod(ctx, pn, c.PFacts.Detail[pn]); verrors.IsReconcileAborted(res, err) { + if err == nil { + // If we fail due to a requeue, we will attempt to reconcile other pods before ultimately bailing out. + savedRes = res + continue + } + return res, err + } + } + return savedRes, nil +} + +// reconcilePod will handle checking for the label of a single pod +func (c *ClientRoutingLabelReconciler) reconcilePod(ctx context.Context, pn types.NamespacedName, pf *PodFact) (ctrl.Result, error) { + var res ctrl.Result + // We retry if case someone else updated the pod since we last fetched it + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + pod := &corev1.Pod{} + if e := c.VRec.Client.Get(ctx, pn, pod); e != nil { + // Not found errors are okay to ignore since there is no pod to + // add/remove a label. + if errors.IsNotFound(e) { + return nil + } + return e + } + + patch := client.MergeFrom(pod.DeepCopy()) + c.manipulateRoutingLabelInPod(pod, pf) + err := c.VRec.Client.Patch(ctx, pod, patch) + if err != nil { + return err + } + + if c.ApplyMethod == AddNodeApplyMethod && pf.upNode && pf.shardSubscriptions == 0 && !pf.pendingDelete { + c.VRec.Log.Info("Will requeue reconciliation because pod does not have any shard subscriptions yet", "name", pf.name) + res.Requeue = true + } + return nil + }) + return res, err +} + +func (c *ClientRoutingLabelReconciler) manipulateRoutingLabelInPod(pod *corev1.Pod, pf *PodFact) { + _, labelExists := pod.Labels[builder.ClientRoutingLabel] + + // There are 4 cases this reconciler is used: + // 1) Called after add node + // 2) Called after pod reschedule + restart + // 3) Called before remove node + // 4) Called before removal of a subcluster + // + // For 1) and 2), we are going to add labels to qualify pods. For 2), + // we will reschedule as this reconciler is usually paired with a + // rebalance_shards() call. + // + // For 3), we are going to remove labels so that client connections + // stopped getting routed there. This only applies to pods that are + // pending delete. + // + // For 4), like 3) we are going to remove labels. This applies to the + // entire subcluster, so pending delete isn't checked. + switch c.ApplyMethod { + case AddNodeApplyMethod, PodRescheduleApplyMethod: + if !labelExists && pf.upNode && pf.shardSubscriptions > 0 && !pf.pendingDelete { + pod.Labels[builder.ClientRoutingLabel] = builder.ClientRoutingVal + c.VRec.Log.Info("Adding client routing label", "pod", + pod.Name, "label", fmt.Sprintf("%s=%s", builder.ClientRoutingLabel, builder.ClientRoutingVal)) + } + case DelNodeApplyMethod: + if labelExists && pf.pendingDelete { + delete(pod.Labels, builder.ClientRoutingLabel) + c.VRec.Log.Info("Removing client routing label", "pod", + pod.Name, "label", fmt.Sprintf("%s=%s", builder.ClientRoutingLabel, builder.ClientRoutingVal)) + } + } +} diff --git a/pkg/controllers/clientroutinglabel_reconcile_test.go b/pkg/controllers/clientroutinglabel_reconcile_test.go new file mode 100644 index 000000000..473772cb6 --- /dev/null +++ b/pkg/controllers/clientroutinglabel_reconcile_test.go @@ -0,0 +1,163 @@ +/* + (c) Copyright [2021-2022] Micro Focus or one of its affiliates. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package controllers + +import ( + "context" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + vapi "github.com/vertica/vertica-kubernetes/api/v1beta1" + "github.com/vertica/vertica-kubernetes/pkg/builder" + "github.com/vertica/vertica-kubernetes/pkg/cmds" + "github.com/vertica/vertica-kubernetes/pkg/names" + "github.com/vertica/vertica-kubernetes/pkg/test" + corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" +) + +var _ = Describe("subscriptionlabel_reconcile", func() { + ctx := context.Background() + + It("should add label to pods that have at least one shard subscription", func() { + vdb := vapi.MakeVDB() + vdb.Spec.Subclusters = []vapi.Subcluster{ + {Name: "sc1", Size: 1}, + {Name: "sc2", Size: 1}, + } + test.CreatePods(ctx, k8sClient, vdb, test.AllPodsRunning) + defer test.DeletePods(ctx, k8sClient, vdb) + + fpr := &cmds.FakePodRunner{} + pfacts := MakePodFacts(k8sClient, fpr) + Expect(pfacts.Collect(ctx, vdb)).Should(Succeed()) + pfn1 := names.GenPodName(vdb, &vdb.Spec.Subclusters[0], 0) + pfacts.Detail[pfn1].upNode = true + pfacts.Detail[pfn1].shardSubscriptions = 0 + pfn2 := names.GenPodName(vdb, &vdb.Spec.Subclusters[1], 0) + pfacts.Detail[pfn2].shardSubscriptions = 3 + pfacts.Detail[pfn2].upNode = true + r := MakeClientRoutingLabelReconciler(vdbRec, vdb, &pfacts, PodRescheduleApplyMethod, "") + Expect(r.Reconcile(ctx, &ctrl.Request{})).Should(Equal(ctrl.Result{})) + + pod := &corev1.Pod{} + Expect(k8sClient.Get(ctx, pfn1, pod)).Should(Succeed()) + _, ok := pod.Labels[builder.ClientRoutingLabel] + Expect(ok).Should(BeFalse()) + Expect(k8sClient.Get(ctx, pfn2, pod)).Should(Succeed()) + v, ok := pod.Labels[builder.ClientRoutingLabel] + Expect(ok).Should(BeTrue()) + Expect(v).Should(Equal(builder.ClientRoutingVal)) + }) + + It("should ignore second subcluster when sc filter is used", func() { + vdb := vapi.MakeVDB() + vdb.Spec.Subclusters = []vapi.Subcluster{ + {Name: "sc1", Size: 1}, + {Name: "sc2", Size: 1}, + } + test.CreatePods(ctx, k8sClient, vdb, test.AllPodsRunning) + defer test.DeletePods(ctx, k8sClient, vdb) + + fpr := &cmds.FakePodRunner{} + pfacts := MakePodFacts(k8sClient, fpr) + Expect(pfacts.Collect(ctx, vdb)).Should(Succeed()) + pfn1 := names.GenPodName(vdb, &vdb.Spec.Subclusters[0], 0) + pfacts.Detail[pfn1].upNode = true + pfacts.Detail[pfn1].shardSubscriptions = 5 + pfn2 := names.GenPodName(vdb, &vdb.Spec.Subclusters[1], 0) + pfacts.Detail[pfn2].shardSubscriptions = 3 + pfacts.Detail[pfn2].upNode = true + r := MakeClientRoutingLabelReconciler(vdbRec, vdb, &pfacts, PodRescheduleApplyMethod, vdb.Spec.Subclusters[0].Name) + Expect(r.Reconcile(ctx, &ctrl.Request{})).Should(Equal(ctrl.Result{})) + + pod := &corev1.Pod{} + Expect(k8sClient.Get(ctx, pfn1, pod)).Should(Succeed()) + v, ok := pod.Labels[builder.ClientRoutingLabel] + Expect(ok).Should(BeTrue()) + Expect(v).Should(Equal(builder.ClientRoutingVal)) + Expect(k8sClient.Get(ctx, pfn2, pod)).Should(Succeed()) + _, ok = pod.Labels[builder.ClientRoutingLabel] + Expect(ok).Should(BeFalse()) + }) + + It("should requeue if at least one pod does not have subscriptions but other pods should have label", func() { + vdb := vapi.MakeVDB() + vdb.Spec.Subclusters = []vapi.Subcluster{ + {Name: "sc1", Size: 5}, + } + test.CreatePods(ctx, k8sClient, vdb, test.AllPodsRunning) + defer test.DeletePods(ctx, k8sClient, vdb) + + fpr := &cmds.FakePodRunner{} + pfacts := MakePodFacts(k8sClient, fpr) + Expect(pfacts.Collect(ctx, vdb)).Should(Succeed()) + sc := &vdb.Spec.Subclusters[0] + for i := int32(0); i < sc.Size; i++ { + pn := names.GenPodName(vdb, &vdb.Spec.Subclusters[0], i) + pfacts.Detail[pn].upNode = true + pfacts.Detail[pn].shardSubscriptions = int(i) // Ensures that only one pod will not have subscriptions + } + r := MakeClientRoutingLabelReconciler(vdbRec, vdb, &pfacts, AddNodeApplyMethod, "") + Expect(r.Reconcile(ctx, &ctrl.Request{})).Should(Equal(ctrl.Result{Requeue: true})) + + pod := &corev1.Pod{} + for i := int32(0); i < sc.Size; i++ { + pn := names.GenPodName(vdb, &vdb.Spec.Subclusters[0], i) + Expect(k8sClient.Get(ctx, pn, pod)).Should(Succeed()) + v, ok := pod.Labels[builder.ClientRoutingLabel] + if i == 0 { + Expect(ok).Should(BeFalse()) + } else { + Expect(ok).Should(BeTrue()) + Expect(v).Should(Equal(builder.ClientRoutingVal)) + } + } + }) + + It("should remove label when pod is pending delete", func() { + vdb := vapi.MakeVDB() + vdb.Spec.Subclusters = []vapi.Subcluster{ + {Name: "sc1", Size: 1}, + } + test.CreatePods(ctx, k8sClient, vdb, test.AllPodsRunning) + defer test.DeletePods(ctx, k8sClient, vdb) + + fpr := &cmds.FakePodRunner{} + pfacts := MakePodFacts(k8sClient, fpr) + Expect(pfacts.Collect(ctx, vdb)).Should(Succeed()) + pn := names.GenPodName(vdb, &vdb.Spec.Subclusters[0], 0) + pfacts.Detail[pn].upNode = true + pfacts.Detail[pn].shardSubscriptions = 10 + act := MakeClientRoutingLabelReconciler(vdbRec, vdb, &pfacts, AddNodeApplyMethod, "") + r := act.(*ClientRoutingLabelReconciler) + Expect(r.Reconcile(ctx, &ctrl.Request{})).Should(Equal(ctrl.Result{})) + + pod := &corev1.Pod{} + Expect(k8sClient.Get(ctx, pn, pod)).Should(Succeed()) + v, ok := pod.Labels[builder.ClientRoutingLabel] + Expect(ok).Should(BeTrue()) + Expect(v).Should(Equal(builder.ClientRoutingVal)) + + pfacts.Detail[pn].pendingDelete = true + r.ApplyMethod = DelNodeApplyMethod + Expect(r.Reconcile(ctx, &ctrl.Request{})).Should(Equal(ctrl.Result{})) + + Expect(k8sClient.Get(ctx, pn, pod)).Should(Succeed()) + _, ok = pod.Labels[builder.ClientRoutingLabel] + Expect(ok).Should(BeFalse()) + }) +}) diff --git a/pkg/controllers/obj_reconcile.go b/pkg/controllers/obj_reconcile.go index 47d223e8f..1a424e5d7 100644 --- a/pkg/controllers/obj_reconcile.go +++ b/pkg/controllers/obj_reconcile.go @@ -65,6 +65,10 @@ func MakeObjReconciler(vdbrecon *VerticaDBReconciler, log logr.Logger, vdb *vapi // Reconcile is the main driver for reconciliation of Kubernetes objects. // This will ensure the desired svc and sts objects exist and are in the correct state. func (o *ObjReconciler) Reconcile(ctx context.Context, req *ctrl.Request) (ctrl.Result, error) { + if err := o.PFacts.Collect(ctx, o.Vdb); err != nil { + return ctrl.Result{}, err + } + // Ensure any secrets/configMaps that we mount exist with the correct keys. // We catch the errors here so that we can provide timely events. if res, err := o.checkMountedObjs(ctx); verrors.IsReconcileAborted(res, err) { diff --git a/pkg/controllers/offlineupgrade_reconcile.go b/pkg/controllers/offlineupgrade_reconcile.go index 70ca6fb9e..39a058674 100644 --- a/pkg/controllers/offlineupgrade_reconcile.go +++ b/pkg/controllers/offlineupgrade_reconcile.go @@ -96,6 +96,8 @@ func (o *OfflineUpgradeReconciler) Reconcile(ctx context.Context, req *ctrl.Requ // Start up vertica in each pod. o.postRestartingClusterMsg, o.restartCluster, + // Apply labels so svc objects can route to the new pods that came up + o.addClientRoutingLabel, // Cleanup up the condition and event recording for a completed upgrade o.Manager.finishUpgrade, } @@ -258,6 +260,15 @@ func (o *OfflineUpgradeReconciler) restartCluster(ctx context.Context) (ctrl.Res return r.Reconcile(ctx, &ctrl.Request{}) } +// addClientRoutingLabel will add the special label we use so that Service +// objects will route to the pods. This is done after the pods have been +// reschedulde and vertica restarted. +func (o *OfflineUpgradeReconciler) addClientRoutingLabel(ctx context.Context) (ctrl.Result, error) { + r := MakeClientRoutingLabelReconciler(o.VRec, o.Vdb, o.PFacts, + PodRescheduleApplyMethod, "" /* all subclusters */) + return r.Reconcile(ctx, &ctrl.Request{}) +} + // anyPodsRunningWithOldImage will check if any upNode pods are running with the old image. func (o *OfflineUpgradeReconciler) anyPodsRunningWithOldImage(ctx context.Context) (bool, error) { for pn, pf := range o.PFacts.Detail { diff --git a/pkg/controllers/onlineupgrade_reconcile_test.go b/pkg/controllers/onlineupgrade_reconcile_test.go index 25f51ae41..ac4907004 100644 --- a/pkg/controllers/onlineupgrade_reconcile_test.go +++ b/pkg/controllers/onlineupgrade_reconcile_test.go @@ -45,10 +45,10 @@ var _ = Describe("onlineupgrade_reconcile", func() { test.CreatePods(ctx, k8sClient, vdb, test.AllPodsRunning) defer test.DeletePods(ctx, k8sClient, vdb) - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) Expect(r.loadSubclusterState(ctx)).Should(Equal(ctrl.Result{})) Expect(r.skipTransientSetup()).Should(BeTrue()) - vdb.Spec.Image = NewImageName + r.Vdb.Spec.Image = NewImageName Expect(r.skipTransientSetup()).Should(BeFalse()) }) @@ -67,16 +67,22 @@ var _ = Describe("onlineupgrade_reconcile", func() { defer test.DeletePods(ctx, k8sClient, vdb) defer test.DeleteSvcs(ctx, k8sClient, vdb) vdb.Spec.Image = NewImageName // Trigger an upgrade + Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) Expect(r.loadSubclusterState(ctx)).Should(Equal(ctrl.Result{})) + Expect(r.addTransientToVdb(ctx)).Should(Equal(ctrl.Result{})) Expect(r.createTransientSts(ctx)).Should(Equal(ctrl.Result{})) - transientSc := vdb.BuildTransientSubcluster("") - defer test.DeleteSts(ctx, k8sClient, vdb, transientSc, 1) // Add to defer for pods in transient + var nilSc *vapi.Subcluster + transientSc := vdb.FindTransientSubcluster() + Expect(transientSc).ShouldNot(Equal(nilSc)) fetchedSts := &appsv1.StatefulSet{} - Expect(k8sClient.Get(ctx, names.GenStsName(vdb, transientSc), fetchedSts)) + Expect(k8sClient.Get(ctx, names.GenStsName(vdb, transientSc), fetchedSts)).Should(Succeed()) + + Expect(r.removeTransientFromVdb(ctx)).Should(Equal(ctrl.Result{})) + Expect(vdb.FindTransientSubcluster()).Should(Equal(nilSc)) Expect(r.loadSubclusterState(ctx)).Should(Equal(ctrl.Result{})) // Collect state again for new pods/sts @@ -87,10 +93,8 @@ var _ = Describe("onlineupgrade_reconcile", func() { r.PFacts.Detail[pn].isInstalled = tristate.False r.PFacts.Detail[pn].dbExists = tristate.False - sts := &appsv1.StatefulSet{} - Expect(k8sClient.Get(ctx, names.GenStsName(vdb, transientSc), sts)).Should(Succeed()) Expect(r.deleteTransientSts(ctx)).Should(Equal(ctrl.Result{})) - Expect(k8sClient.Get(ctx, names.GenStsName(vdb, transientSc), sts)).ShouldNot(Succeed()) + Expect(k8sClient.Get(ctx, names.GenStsName(vdb, transientSc), fetchedSts)).ShouldNot(Succeed()) }) It("should be able to figure out what the old image was", func() { @@ -101,8 +105,9 @@ var _ = Describe("onlineupgrade_reconcile", func() { test.CreatePods(ctx, k8sClient, vdb, test.AllPodsRunning) defer test.DeletePods(ctx, k8sClient, vdb) vdb.Spec.Image = NewImageName // Trigger an upgrade + Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) Expect(r.loadSubclusterState(ctx)).Should(Equal(ctrl.Result{})) oldImage, ok := r.fetchOldImage() Expect(ok).Should(BeTrue()) @@ -114,7 +119,7 @@ var _ = Describe("onlineupgrade_reconcile", func() { const ScName = "sc1" const TransientScName = "transient" vdb.Spec.Subclusters = []vapi.Subcluster{ - {Name: ScName, IsPrimary: true}, + {Name: ScName, IsPrimary: true, Size: 1}, } sc := &vdb.Spec.Subclusters[0] vdb.Spec.TemporarySubclusterRouting.Template = vapi.Subcluster{ @@ -129,13 +134,14 @@ var _ = Describe("onlineupgrade_reconcile", func() { defer test.DeletePods(ctx, k8sClient, vdb) test.CreateSvcs(ctx, k8sClient, vdb) defer test.DeleteSvcs(ctx, k8sClient, vdb) - transientSc := vdb.BuildTransientSubcluster("") - test.CreateSts(ctx, k8sClient, vdb, transientSc, 1, 0, test.AllPodsNotRunning) - defer test.DeleteSts(ctx, k8sClient, vdb, transientSc, 1) vdb.Spec.Image = NewImageName // Trigger an upgrade + Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) + Expect(r.loadSubclusterState(ctx)).Should(Equal(ctrl.Result{})) + Expect(r.addTransientToVdb(ctx)).Should(Equal(ctrl.Result{})) + Expect(r.createTransientSts(ctx)).Should(Equal(ctrl.Result{})) Expect(r.routeClientTraffic(ctx, ScName, true)).Should(Succeed()) svc := &corev1.Service{} Expect(k8sClient.Get(ctx, names.GenExtSvcName(vdb, sc), svc)).Should(Succeed()) @@ -153,7 +159,7 @@ var _ = Describe("onlineupgrade_reconcile", func() { vdb := vapi.MakeVDB() const ScName = "sc1" vdb.Spec.Subclusters = []vapi.Subcluster{ - {Name: ScName, IsPrimary: true}, + {Name: ScName, IsPrimary: true, Size: 1}, } sc := &vdb.Spec.Subclusters[0] vdb.Spec.TemporarySubclusterRouting.Template = vapi.Subcluster{ @@ -169,20 +175,23 @@ var _ = Describe("onlineupgrade_reconcile", func() { test.CreateSvcs(ctx, k8sClient, vdb) defer test.DeleteSvcs(ctx, k8sClient, vdb) vdb.Spec.Image = NewImageName // Trigger an upgrade + Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) + Expect(r.loadSubclusterState(ctx)).Should(Equal(ctrl.Result{})) Expect(r.routeClientTraffic(ctx, ScName, true)).Should(Succeed()) svc := &corev1.Service{} Expect(k8sClient.Get(ctx, names.GenExtSvcName(vdb, sc), svc)).Should(Succeed()) - Expect(svc.Spec.Selector[builder.SubclusterSvcNameLabel]).Should(Equal(ScName)) - Expect(svc.Spec.Selector[builder.SubclusterNameLabel]).Should(Equal("")) + Expect(svc.Spec.Selector[builder.SubclusterSvcNameLabel]).Should(Equal("")) + Expect(svc.Spec.Selector[builder.SubclusterNameLabel]).Should(Equal(ScName)) + Expect(svc.Spec.Selector[builder.ClientRoutingLabel]).Should(Equal(builder.ClientRoutingVal)) }) It("should avoid creating transient if the cluster is down", func() { vdb := vapi.MakeVDB() const ScName = "sc1" vdb.Spec.Subclusters = []vapi.Subcluster{ - {Name: ScName, IsPrimary: true}, + {Name: ScName, IsPrimary: true, Size: 1}, } vdb.Spec.TemporarySubclusterRouting.Template.Name = "wont-be-created" vdb.Spec.Image = OldImage @@ -193,8 +202,9 @@ var _ = Describe("onlineupgrade_reconcile", func() { test.CreateSvcs(ctx, k8sClient, vdb) defer test.DeleteSvcs(ctx, k8sClient, vdb) vdb.Spec.Image = NewImageName // Trigger an upgrade + Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) Expect(r.skipTransientSetup()).Should(BeTrue()) }) @@ -203,8 +213,8 @@ var _ = Describe("onlineupgrade_reconcile", func() { const PriScName = "pri" const SecScName = "sec" vdb.Spec.Subclusters = []vapi.Subcluster{ - {Name: PriScName, IsPrimary: true}, - {Name: SecScName, IsPrimary: false}, + {Name: PriScName, IsPrimary: true, Size: 1}, + {Name: SecScName, IsPrimary: false, Size: 1}, } vdb.Spec.TemporarySubclusterRouting.Names = []string{"dummy-non-existent", SecScName, PriScName} vdb.Spec.Image = OldImage @@ -215,12 +225,14 @@ var _ = Describe("onlineupgrade_reconcile", func() { test.CreateSvcs(ctx, k8sClient, vdb) defer test.DeleteSvcs(ctx, k8sClient, vdb) vdb.Spec.Image = NewImageName // Trigger an upgrade + Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) svc := &corev1.Service{} Expect(k8sClient.Get(ctx, names.GenExtSvcName(vdb, &vdb.Spec.Subclusters[0]), svc)).Should(Succeed()) Expect(svc.Spec.Selector[builder.SubclusterSvcNameLabel]).Should(Equal(PriScName)) - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) + Expect(r.loadSubclusterState(ctx)).Should(Equal(ctrl.Result{})) // Route for primary subcluster Expect(r.routeClientTraffic(ctx, PriScName, true)).Should(Succeed()) @@ -248,8 +260,8 @@ var _ = Describe("onlineupgrade_reconcile", func() { const PriScName = "pri" const SecScName = "sec" vdb.Spec.Subclusters = []vapi.Subcluster{ - {Name: PriScName, IsPrimary: true}, - {Name: SecScName, IsPrimary: false}, + {Name: PriScName, IsPrimary: true, Size: 1}, + {Name: SecScName, IsPrimary: false, Size: 1}, } vdb.Spec.Image = OldImage test.CreateVDB(ctx, k8sClient, vdb) @@ -257,8 +269,9 @@ var _ = Describe("onlineupgrade_reconcile", func() { test.CreatePods(ctx, k8sClient, vdb, test.AllPodsRunning) defer test.DeletePods(ctx, k8sClient, vdb) vdb.Spec.Image = NewImageName // Trigger an upgrade + Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) sts := &appsv1.StatefulSet{} Expect(k8sClient.Get(ctx, names.GenStsName(vdb, &vdb.Spec.Subclusters[0]), sts)).Should(Succeed()) @@ -276,13 +289,13 @@ var _ = Describe("onlineupgrade_reconcile", func() { It("should update image in each sts", func() { vdb := vapi.MakeVDB() - const PriScName = "pri" - const SecScName = "sec" + const Pri1ScName = "pri1" + const Pri2ScName = "pri2" vdb.Spec.Subclusters = []vapi.Subcluster{ - {Name: PriScName, IsPrimary: true}, - {Name: SecScName, IsPrimary: false}, + {Name: Pri1ScName, IsPrimary: true, Size: 1}, + {Name: Pri2ScName, IsPrimary: true, Size: 1}, } - vdb.Spec.TemporarySubclusterRouting.Names = []string{SecScName, PriScName} + vdb.Spec.TemporarySubclusterRouting.Names = []string{Pri2ScName, Pri1ScName} vdb.Spec.Image = OldImage vdb.Spec.UpgradePolicy = vapi.OnlineUpgrade vdb.Spec.IgnoreUpgradePath = true @@ -291,12 +304,18 @@ var _ = Describe("onlineupgrade_reconcile", func() { defer test.DeleteVDB(ctx, k8sClient, vdb) test.CreatePods(ctx, k8sClient, vdb, test.AllPodsRunning) defer test.DeletePods(ctx, k8sClient, vdb) + test.CreateSvcs(ctx, k8sClient, vdb) + defer test.DeleteSvcs(ctx, k8sClient, vdb) vdb.Spec.Image = NewImageName // Trigger an upgrade Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) - r := createOnlineUpgradeReconciler(vdb) - Expect(r.Reconcile(ctx, &ctrl.Request{})).Should(Equal(ctrl.Result{})) + r := createOnlineUpgradeReconciler(ctx, vdb) + // The reconcile will requeue when it waits for pods to come online that + // may need a restart. It would have gotten far enough to update the + // sts for the primaries. + Expect(r.Reconcile(ctx, &ctrl.Request{})).Should(Equal( + ctrl.Result{Requeue: false, RequeueAfter: vdb.GetUpgradeRequeueTime()})) sts := &appsv1.StatefulSet{} Expect(k8sClient.Get(ctx, names.GenStsName(vdb, &vdb.Spec.Subclusters[0]), sts)).Should(Succeed()) @@ -320,10 +339,11 @@ var _ = Describe("onlineupgrade_reconcile", func() { defer test.DeletePods(ctx, k8sClient, vdb) vdb.Spec.Image = NewImageName // Trigger an upgrade + Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) Expect(r.Reconcile(ctx, &ctrl.Request{})).Should(Equal(ctrl.Result{Requeue: false, RequeueAfter: vdb.GetUpgradeRequeueTime()})) Expect(vdb.Status.UpgradeStatus).Should(Equal("Checking if new version is compatible")) }) @@ -344,7 +364,7 @@ var _ = Describe("onlineupgrade_reconcile", func() { vdb.Spec.Image = NewImageName // Trigger an upgrade Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) pn := names.GenPodName(vdb, sc, 0) Expect(r.PFacts.Collect(ctx, vdb)).Should(Succeed()) r.PFacts.Detail[pn].upNode = true @@ -377,14 +397,13 @@ var _ = Describe("onlineupgrade_reconcile", func() { defer test.DeletePods(ctx, k8sClient, vdb) vdb.Spec.Image = NewImageName // Trigger an upgrade - Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) Expect(r.Reconcile(ctx, &ctrl.Request{})).Should(Equal(ctrl.Result{Requeue: false, RequeueAfter: (time.Second * 100)})) }) - It("should return transient if doing online upgrade and transient isn't created yet", func() { + It("should return transient in the finder if doing online upgrade", func() { vdb := vapi.MakeVDB() const ScName = "sc1" const TransientScName = "a-transient" @@ -406,31 +425,22 @@ var _ = Describe("onlineupgrade_reconcile", func() { transientSc := vdb.BuildTransientSubcluster("") vdb.Spec.Image = NewImageName // Trigger an upgrade + Expect(k8sClient.Update(ctx, vdb)).Should(Succeed()) - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) Expect(r.Manager.startUpgrade(ctx)).Should(Equal(ctrl.Result{})) + Expect(r.loadSubclusterState(ctx)).Should(Equal(ctrl.Result{})) + Expect(r.addTransientToVdb(ctx)).Should(Equal(ctrl.Result{})) + Expect(r.createTransientSts(ctx)).Should(Equal(ctrl.Result{})) - // Confirm transient doesn't exist + // Confirm transient exists sts := &appsv1.StatefulSet{} - Expect(k8sClient.Get(ctx, names.GenStsName(vdb, transientSc), sts)).ShouldNot(Succeed()) + Expect(k8sClient.Get(ctx, names.GenStsName(vdb, transientSc), sts)).Should(Succeed()) - // Confirm it gets returned from the finder scs, err := r.Finder.FindSubclusters(ctx, iter.FindAll|iter.FindSorted) Expect(err).Should(Succeed()) Expect(len(scs)).Should(Equal(2)) Expect(scs[0].Name).Should(Equal(TransientScName)) - Expect(scs[0].Size).Should(Equal(int32(1))) - Expect(scs[1].Name).Should(Equal(ScName)) - - // Create transient and make sure finder only returns one instance of - // the transient - test.CreateSts(ctx, k8sClient, vdb, transientSc, 1, 0, test.AllPodsRunning) - defer test.DeleteSts(ctx, k8sClient, vdb, transientSc, 1) - - scs, err = r.Finder.FindSubclusters(ctx, iter.FindAll|iter.FindSorted) - Expect(err).Should(Succeed()) - Expect(len(scs)).Should(Equal(2)) - Expect(scs[0].Name).Should(Equal(TransientScName)) Expect(scs[1].Name).Should(Equal(ScName)) }) @@ -442,7 +452,7 @@ var _ = Describe("onlineupgrade_reconcile", func() { {Name: PriScName, IsPrimary: true, Size: 1}, } - r := createOnlineUpgradeReconciler(vdb) + r := createOnlineUpgradeReconciler(ctx, vdb) scMap := vdb.GenSubclusterMap() routingSc := r.getSubclusterForTemporaryRouting(ctx, &vdb.Spec.Subclusters[0], scMap) Expect(routingSc.Name).Should(Equal(PriScName)) @@ -460,9 +470,17 @@ var _ = Describe("onlineupgrade_reconcile", func() { }) // createOnlineUpgradeReconciler is a helper to run the OnlineUpgradeReconciler. -func createOnlineUpgradeReconciler(vdb *vapi.VerticaDB) *OnlineUpgradeReconciler { +func createOnlineUpgradeReconciler(ctx context.Context, vdb *vapi.VerticaDB) *OnlineUpgradeReconciler { fpr := &cmds.FakePodRunner{Results: cmds.CmdResults{}} pfacts := MakePodFacts(k8sClient, fpr) actor := MakeOnlineUpgradeReconciler(vdbRec, logger, vdb, fpr, &pfacts) - return actor.(*OnlineUpgradeReconciler) + r := actor.(*OnlineUpgradeReconciler) + + // Ensure one pod is up so that we can do an online upgrade + Expect(r.PFacts.Collect(ctx, vdb)).Should(Succeed()) + pn := names.GenPodName(vdb, &vdb.Spec.Subclusters[0], 0) + r.PFacts.Detail[pn].upNode = true + r.PFacts.Detail[pn].readOnly = false + + return r } diff --git a/pkg/controllers/onlineupgrade_reconciler.go b/pkg/controllers/onlineupgrade_reconciler.go index 9c9a0c89c..553026cd7 100644 --- a/pkg/controllers/onlineupgrade_reconciler.go +++ b/pkg/controllers/onlineupgrade_reconciler.go @@ -20,7 +20,6 @@ import ( "fmt" "strconv" "strings" - "time" "github.com/go-logr/logr" vapi "github.com/vertica/vertica-kubernetes/api/v1beta1" @@ -32,6 +31,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" ) @@ -40,7 +40,8 @@ import ( type OnlineUpgradeReconciler struct { VRec *VerticaDBReconciler Log logr.Logger - Vdb *vapi.VerticaDB // Vdb is the CRD we are acting on. + Vdb *vapi.VerticaDB // Vdb is the CRD we are acting on. + TransientSc *vapi.Subcluster // Set to the transient subcluster if applicable PRunner cmds.PodRunner PFacts *PodFacts Finder iter.SubclusterFinder @@ -77,17 +78,21 @@ func (o *OnlineUpgradeReconciler) Reconcile(ctx context.Context, req *ctrl.Reque // Setup a transient subcluster to accept traffic when other subclusters // are down o.postNextStatusMsg, + o.addTransientToVdb, o.createTransientSts, o.installTransientNodes, o.addTransientSubcluster, o.addTransientNodes, - o.waitForReadyTransientPod, + o.rebalanceTransientNodes, + o.addClientRoutingLabelToTransientNodes, // Handle restart of the primary subclusters o.restartPrimaries, // Handle restart of secondary subclusters o.restartSecondaries, // Will cleanup the transient subcluster now that the primaries are back up. o.postNextStatusMsg, + o.removeTransientFromVdb, + o.removeClientRoutingLabelFromTransientNodes, o.removeTransientSubclusters, o.uninstallTransientNodes, o.deleteTransientSts, @@ -117,6 +122,8 @@ func (o *OnlineUpgradeReconciler) loadSubclusterState(ctx context.Context) (ctrl return ctrl.Result{}, err } + o.TransientSc = o.Vdb.FindTransientSubcluster() + err = o.cachePrimaryImages(ctx) return ctrl.Result{}, err } @@ -165,6 +172,56 @@ func (o *OnlineUpgradeReconciler) postNextStatusMsgForSts(ctx context.Context, s return o.postNextStatusMsg(ctx) } +// addTransientToVdb will add the transient subcluster to the VerticaDB. This +// is stored in the api server. It will get removed at the end of the +// upgrade. +func (o *OnlineUpgradeReconciler) addTransientToVdb(ctx context.Context) (ctrl.Result, error) { + if o.TransientSc != nil { + return ctrl.Result{}, nil + } + + if o.skipTransientSetup() { + return ctrl.Result{}, nil + } + + oldImage, ok := o.fetchOldImage() + if !ok { + return ctrl.Result{}, fmt.Errorf("could not determine the old image name. "+ + "Only available image is %s", o.Vdb.Spec.Image) + } + + transientSc := o.Vdb.BuildTransientSubcluster(oldImage) + + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // Always fetch the latest to minimize the chance of getting a conflict error. + nm := o.Vdb.ExtractNamespacedName() + if err := o.VRec.Client.Get(ctx, nm, o.Vdb); err != nil { + return err + } + + // Ensure we only have at most one transient subcluster + if otherSc := o.Vdb.FindTransientSubcluster(); otherSc != nil { + o.Log.Info("Transient subcluster already exists. Skip adding another one", + "name", otherSc.Name) + o.TransientSc = otherSc // Ensure we cache the one we found + return nil + } + + o.Vdb.Spec.Subclusters = append(o.Vdb.Spec.Subclusters, *transientSc) + o.TransientSc = &o.Vdb.Spec.Subclusters[len(o.Vdb.Spec.Subclusters)-1] + err := o.VRec.Client.Update(ctx, o.Vdb) + if err != nil { + return err + } + + // Refresh things now that vdb has changed + o.PFacts.Invalidate() + o.Finder = iter.MakeSubclusterFinder(o.VRec.Client, o.Vdb) + return nil + }) + return ctrl.Result{}, err +} + // createTransientSts this will create a secondary subcluster to accept // traffic from subclusters when they are down. This subcluster is called // the transient and only exist for the life of the upgrade. @@ -177,14 +234,7 @@ func (o *OnlineUpgradeReconciler) createTransientSts(ctx context.Context) (ctrl. o.traceActorReconcile(actor) or := actor.(*ObjReconciler) - oldImage, ok := o.fetchOldImage() - if !ok { - return ctrl.Result{}, fmt.Errorf("could not determine the old image name. "+ - "Only available image is %s", o.Vdb.Spec.Image) - } - - sc := o.Vdb.BuildTransientSubcluster(oldImage) - return or.reconcileSts(ctx, sc) + return or.reconcileSts(ctx, o.TransientSc) } // installTransientNodes will ensure we have installed vertica on @@ -211,7 +261,7 @@ func (o *OnlineUpgradeReconciler) addTransientSubcluster(ctx context.Context) (c return ctrl.Result{}, err } d := actor.(*DBAddSubclusterReconciler) - return d.addMissingSubclusters(ctx, []vapi.Subcluster{*o.Vdb.BuildTransientSubcluster("")}) + return d.addMissingSubclusters(ctx, []vapi.Subcluster{*o.TransientSc}) } // addTransientNodes will ensure nodes on the transient have been added to the @@ -227,44 +277,32 @@ func (o *OnlineUpgradeReconciler) addTransientNodes(ctx context.Context) (ctrl.R return ctrl.Result{}, err } d := actor.(*DBAddNodeReconciler) - return d.reconcileSubcluster(ctx, o.Vdb.BuildTransientSubcluster("")) + return d.reconcileSubcluster(ctx, o.TransientSc) } -// waitForReadyTransientPod will wait for one of the transient pods to be ready. -// This is done so that when we direct traffic to the transient subcluster the -// service object has a pod to route too. -func (o *OnlineUpgradeReconciler) waitForReadyTransientPod(ctx context.Context) (ctrl.Result, error) { +// rebalanceTransientNodes will run a rebalance against the transient subcluster +func (o *OnlineUpgradeReconciler) rebalanceTransientNodes(ctx context.Context) (ctrl.Result, error) { if o.skipTransientSetup() { return ctrl.Result{}, nil } - pod := &corev1.Pod{} - sc := o.Vdb.BuildTransientSubcluster("") - // We only check the first pod is ready - pn := names.GenPodName(o.Vdb, sc, 0) - - const MaxAttempts = 30 // Retry for roughly 30 seconds - for i := 0; i < MaxAttempts; i++ { - if err := o.VRec.Client.Get(ctx, pn, pod); err != nil { - // Any error, including not found, aborts the retry. The pod should - // have already existed because we call this after db add node. The - // transient pod is not restartable, so if the pod isn't running, - // then it won't ever be ready. - o.Log.Info("Error while fetching transient pod", "err", err) - return ctrl.Result{}, nil - } - if pod.Status.ContainerStatuses[ServerContainerIndex].Ready { - o.Log.Info("Transient pod is in ready state", - "containerStatuses", pod.Status.ContainerStatuses[ServerContainerIndex]) - return ctrl.Result{}, nil - } - const AttemptSleepTime = 1 - time.Sleep(AttemptSleepTime * time.Second) + actor := MakeRebalanceShardsReconciler(o.VRec, o.Log, o.Vdb, o.PRunner, o.PFacts, o.TransientSc.Name) + o.traceActorReconcile(actor) + return actor.Reconcile(ctx, &ctrl.Request{}) +} + +// addClientRoutingLabelToTransientNodes will add the special routing label so +// that Service objects can use the transient subcluster. +func (o *OnlineUpgradeReconciler) addClientRoutingLabelToTransientNodes(ctx context.Context) (ctrl.Result, error) { + if o.skipTransientSetup() { + return ctrl.Result{}, nil } - // If we timeout, we still continue on. The transient pod is not - // restartable, so we don't want to wait indefinitely. The upgrade - // will proceed but any routing to the transient pods will fail. - return ctrl.Result{}, nil + + actor := MakeClientRoutingLabelReconciler(o.VRec, o.Vdb, o.PFacts, AddNodeApplyMethod, o.TransientSc.Name) + o.traceActorReconcile(actor) + // Add the labels. If there is a node that still has missing subscriptions + // that will fail with requeue error. + return actor.Reconcile(ctx, &ctrl.Request{}) } // iterateSubclusterType will iterate over the subclusters, calling the @@ -448,16 +486,69 @@ func (o *OnlineUpgradeReconciler) bringSubclusterOnline(ctx context.Context, sts } scName := sts.Labels[builder.SubclusterNameLabel] + + actor = MakeClientRoutingLabelReconciler(o.VRec, o.Vdb, o.PFacts, PodRescheduleApplyMethod, scName) + res, err = actor.Reconcile(ctx, &ctrl.Request{}) + if verrors.IsReconcileAborted(res, err) { + return res, err + } + o.Log.Info("starting client traffic routing back to subcluster", "name", scName) err = o.routeClientTraffic(ctx, scName, false) return ctrl.Result{}, err } -// removeTransientSubclusters will drive subcluster removal of the transient subcluster +// removeTransientFromVdb will remove the transient subcluster that is in the VerticaDB stored in the apiserver +func (o *OnlineUpgradeReconciler) removeTransientFromVdb(ctx context.Context) (ctrl.Result, error) { + if !o.Vdb.RequiresTransientSubcluster() { + return ctrl.Result{}, nil + } + + o.Log.Info("starting removal of transient from VerticaDB") + + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // Always fetch the latest to minimize the chance of getting a conflict error. + nm := o.Vdb.ExtractNamespacedName() + if err := o.VRec.Client.Get(ctx, nm, o.Vdb); err != nil { + return err + } + + // Remove the transient. + removedTransient := false + for i := len(o.Vdb.Spec.Subclusters) - 1; i >= 0; i-- { + if o.Vdb.Spec.Subclusters[i].IsTransient { + o.Vdb.Spec.Subclusters = append(o.Vdb.Spec.Subclusters[:i], o.Vdb.Spec.Subclusters[i+1:]...) + removedTransient = true + } + } + if !removedTransient { + return nil + } + o.PFacts.Invalidate() // Force refresh due to transient being removed + o.TransientSc = nil + return o.VRec.Client.Update(ctx, o.Vdb) + }) + return ctrl.Result{}, err +} + +// removeClientRoutingLabelFromTransientNodes will remove the special routing +// label since we are about to remove that subcluster +func (o *OnlineUpgradeReconciler) removeClientRoutingLabelFromTransientNodes(ctx context.Context) (ctrl.Result, error) { + if !o.Vdb.RequiresTransientSubcluster() { + return ctrl.Result{}, nil + } + + actor := MakeClientRoutingLabelReconciler(o.VRec, o.Vdb, o.PFacts, DelNodeApplyMethod, "") + o.traceActorReconcile(actor) + return actor.Reconcile(ctx, &ctrl.Request{}) +} + +// removeTransientSubclusters will drive subcluster removal of the transient subcluster. func (o *OnlineUpgradeReconciler) removeTransientSubclusters(ctx context.Context) (ctrl.Result, error) { if !o.Vdb.RequiresTransientSubcluster() { return ctrl.Result{}, nil } + actor := MakeDBRemoveSubclusterReconciler(o.VRec, o.Log, o.Vdb, o.PRunner, o.PFacts) o.traceActorReconcile(actor) return actor.Reconcile(ctx, &ctrl.Request{}) @@ -592,10 +683,8 @@ func (o *OnlineUpgradeReconciler) routeClientTraffic(ctx context.Context, // temporary routing. If no routing decision could be made, this will return nil. func (o *OnlineUpgradeReconciler) getSubclusterForTemporaryRouting(ctx context.Context, offlineSc *vapi.Subcluster, scMap map[string]*vapi.Subcluster) *vapi.Subcluster { - if o.Vdb.RequiresTransientSubcluster() { - // We are modifying a copy of sc, so we set the IsTransient flag to - // know what subcluster we are going to route to. - transientSc := o.Vdb.BuildTransientSubcluster("") + if o.TransientSc != nil { + transientSc := o.TransientSc // Only continue if the transient subcluster exists. It may not // exist if the entire cluster was down when we attempted to create it. @@ -703,5 +792,7 @@ func (o *OnlineUpgradeReconciler) isSubclusterIdle(ctx context.Context, scName s func (o *OnlineUpgradeReconciler) doesScHaveActiveConnections(stdout string) bool { lines := strings.Split(stdout, "\n") res := strings.Trim(lines[0], " ") - return res != "0" + // As a convience for test, allow empty string to be treated as having no + // active connections. + return res != "" && res != "0" } diff --git a/pkg/controllers/podfacts.go b/pkg/controllers/podfacts.go index 1084f5473..bf61991b4 100644 --- a/pkg/controllers/podfacts.go +++ b/pkg/controllers/podfacts.go @@ -71,6 +71,13 @@ type PodFact struct { // (b) statefulset exists but it isn't sized to include this pod yet. managedByParent bool + // true means the pod is scheduled for deletion. This can happen if the + // size of the subcluster has shrunk in the VerticaDB but the pod still + // exists and is managed by a statefulset. The pod is pending delete in + // that once the statefulset is sized according to the subcluster the pod + // will get deleted. + pendingDelete bool + // Have we run install for this pod? None means we are unable to determine // whether it is run. isInstalled tristate.TriState @@ -111,7 +118,8 @@ type PodFact struct { // True if this pod is for a transient subcluster created for online upgrade isTransient bool - // The number of shards this node has subscribed to + // The number of shards this node has subscribed to, not including the + // special replica shard that has unsegmented projections. shardSubscriptions int } @@ -211,6 +219,7 @@ func (p *PodFacts) collectPodByStsIndex(ctx context.Context, vdb *vapi.VerticaDB pf.dnsName = pod.Spec.Hostname + "." + pod.Spec.Subdomain pf.podIP = pod.Status.PodIP pf.isTransient, _ = strconv.ParseBool(pod.Labels[builder.SubclusterTransientLabel]) + pf.pendingDelete = podIndex >= sc.Size pf.image = pod.Spec.Containers[ServerContainerIndex].Image fns := []func(ctx context.Context, vdb *vapi.VerticaDB, pf *PodFact) error{ @@ -337,7 +346,7 @@ func (p *PodFacts) checkShardSubscriptions(ctx context.Context, vdb *vapi.Vertic } cmd := []string{ "-tAc", - fmt.Sprintf("select count(*) from v_catalog.node_subscriptions where node_name = '%s'", + fmt.Sprintf("select count(*) from v_catalog.node_subscriptions where node_name = '%s' and shard_name != 'replica'", pf.vnodeName), } stdout, stderr, err := p.PRunner.ExecVSQL(ctx, pf.name, names.ServerContainer, cmd...) diff --git a/pkg/controllers/rebalanceshards_reconcile.go b/pkg/controllers/rebalanceshards_reconcile.go index dde53a292..cf129d3fc 100644 --- a/pkg/controllers/rebalanceshards_reconcile.go +++ b/pkg/controllers/rebalanceshards_reconcile.go @@ -35,12 +35,20 @@ type RebalanceShardsReconciler struct { Vdb *vapi.VerticaDB // Vdb is the CRD we are acting on. PRunner cmds.PodRunner PFacts *PodFacts + ScName string // Name of the subcluster to rebalance. Leave this blank if you want to handle all subclusters. } // MakeRebalanceShardsReconciler will build a RebalanceShardsReconciler object func MakeRebalanceShardsReconciler(vdbrecon *VerticaDBReconciler, log logr.Logger, - vdb *vapi.VerticaDB, prunner cmds.PodRunner, pfacts *PodFacts) ReconcileActor { - return &RebalanceShardsReconciler{VRec: vdbrecon, Log: log, Vdb: vdb, PRunner: prunner, PFacts: pfacts} + vdb *vapi.VerticaDB, prunner cmds.PodRunner, pfacts *PodFacts, scName string) ReconcileActor { + return &RebalanceShardsReconciler{ + VRec: vdbrecon, + Log: log, + Vdb: vdb, + PRunner: prunner, + PFacts: pfacts, + ScName: scName, + } } // Reconcile will ensure each node has at least one shard subscription @@ -64,6 +72,7 @@ func (s *RebalanceShardsReconciler) Reconcile(ctx context.Context, req *ctrl.Req if err := s.rebalanceShards(ctx, atPod, scToRebalance[i]); err != nil { return ctrl.Result{}, err } + s.PFacts.Invalidate() // Refresh due to shard subscriptions have changed } return ctrl.Result{}, nil @@ -76,7 +85,7 @@ func (s *RebalanceShardsReconciler) findShardsToRebalance() []string { scToRebalance := []string{} for _, pf := range s.PFacts.Detail { - if pf.isPodRunning && pf.upNode && pf.shardSubscriptions == 0 { + if (s.ScName == "" || s.ScName == pf.subcluster) && pf.isPodRunning && pf.upNode && pf.shardSubscriptions == 0 { _, ok := scRebalanceMap[pf.subcluster] if !ok { scToRebalance = append(scToRebalance, pf.subcluster) diff --git a/pkg/controllers/rebalanceshards_reconciler_test.go b/pkg/controllers/rebalanceshards_reconciler_test.go index 046e6a230..ba593d3eb 100644 --- a/pkg/controllers/rebalanceshards_reconciler_test.go +++ b/pkg/controllers/rebalanceshards_reconciler_test.go @@ -48,11 +48,36 @@ var _ = Describe("rebalanceshards_reconcile", func() { pfn = names.GenPodName(vdb, &vdb.Spec.Subclusters[1], 0) pfacts.Detail[pfn].shardSubscriptions = 3 pfacts.Detail[pfn].upNode = true - r := MakeRebalanceShardsReconciler(vdbRec, logger, vdb, fpr, &pfacts) + r := MakeRebalanceShardsReconciler(vdbRec, logger, vdb, fpr, &pfacts, "") Expect(r.Reconcile(ctx, &ctrl.Request{})).Should(Equal(ctrl.Result{})) atCmd := fpr.FindCommands("select rebalance_shards('sc1')") Expect(len(atCmd)).Should(Equal(1)) atCmd = fpr.FindCommands("select rebalance_shards('sc2')") Expect(len(atCmd)).Should(Equal(0)) }) + + It("should only run rebalance shards against specified subcluster ", func() { + vdb := vapi.MakeVDB() + vdb.Spec.Subclusters = []vapi.Subcluster{ + {Name: "sc1", Size: 1}, + {Name: "sc2", Size: 1}, + } + test.CreatePods(ctx, k8sClient, vdb, test.AllPodsRunning) + defer test.DeletePods(ctx, k8sClient, vdb) + + fpr := &cmds.FakePodRunner{} + pfacts := MakePodFacts(k8sClient, fpr) + Expect(pfacts.Collect(ctx, vdb)).Should(Succeed()) + for i := range vdb.Spec.Subclusters { + pn := names.GenPodName(vdb, &vdb.Spec.Subclusters[i], 0) + pfacts.Detail[pn].upNode = true + pfacts.Detail[pn].shardSubscriptions = 0 + } + r := MakeRebalanceShardsReconciler(vdbRec, logger, vdb, fpr, &pfacts, vdb.Spec.Subclusters[1].Name) + Expect(r.Reconcile(ctx, &ctrl.Request{})).Should(Equal(ctrl.Result{})) + atCmd := fpr.FindCommands("select rebalance_shards('sc1')") + Expect(len(atCmd)).Should(Equal(0)) + atCmd = fpr.FindCommands("select rebalance_shards('sc2')") + Expect(len(atCmd)).Should(Equal(1)) + }) }) diff --git a/pkg/controllers/version_reconciler.go b/pkg/controllers/version_reconciler.go index b414924a9..79e43c168 100644 --- a/pkg/controllers/version_reconciler.go +++ b/pkg/controllers/version_reconciler.go @@ -41,7 +41,7 @@ type VersionReconciler struct { FindPodFunc func() (*PodFact, bool) // Function to call to find pod } -// MakeVersionReconciler will build a VersinReconciler object +// MakeVersionReconciler will build a VersionReconciler object func MakeVersionReconciler(vdbrecon *VerticaDBReconciler, log logr.Logger, vdb *vapi.VerticaDB, prunner cmds.PodRunner, pfacts *PodFacts, enforceUpgradePath bool) ReconcileActor { diff --git a/pkg/controllers/verticadb_controller.go b/pkg/controllers/verticadb_controller.go index 1f355c4c1..42abb333c 100644 --- a/pkg/controllers/verticadb_controller.go +++ b/pkg/controllers/verticadb_controller.go @@ -53,7 +53,7 @@ type VerticaDBReconciler struct { //+kubebuilder:rbac:groups=vertica.com,namespace=WATCH_NAMESPACE,resources=verticadbs/finalizers,verbs=update // +kubebuilder:rbac:groups=core,namespace=WATCH_NAMESPACE,resources=services,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apps,namespace=WATCH_NAMESPACE,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups="",namespace=WATCH_NAMESPACE,resources=pods,verbs=get;list;watch;create;update;delete +// +kubebuilder:rbac:groups="",namespace=WATCH_NAMESPACE,resources=pods,verbs=get;list;watch;create;update;delete;patch // +kubebuilder:rbac:groups="",namespace=WATCH_NAMESPACE,resources=pods/exec,verbs=create // +kubebuilder:rbac:groups=core,namespace=WATCH_NAMESPACE,resources=secrets,verbs=get;list;watch @@ -102,76 +102,93 @@ func (r *VerticaDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( pfacts := MakePodFacts(r.Client, prunner) var res ctrl.Result + // Iterate over each actor + actors := r.constructActors(log, vdb, prunner, &pfacts) + for _, act := range actors { + log.Info("starting actor", "name", fmt.Sprintf("%T", act)) + res, err = act.Reconcile(ctx, &req) + // Error or a request to requeue will stop the reconciliation. + if verrors.IsReconcileAborted(res, err) { + // Handle requeue time priority. + // If any function needs a requeue and we have a RequeueTime set, + // then overwrite RequeueAfter. + // Functions such as Upgrade may already set RequeueAfter and Requeue to false + if (res.Requeue || res.RequeueAfter > 0) && vdb.Spec.RequeueTime > 0 { + res.Requeue = false + res.RequeueAfter = time.Duration(vdb.Spec.RequeueTime) + } + log.Info("aborting reconcile of VerticaDB", "result", res, "err", err) + return res, err + } + } + + log.Info("ending reconcile of VerticaDB", "result", res, "err", err) + return res, err +} + +// constructActors will a list of actors that should be run for the reconcile. +// Order matters in that some actors depend on the successeful execution of +// earlier ones. +func (r *VerticaDBReconciler) constructActors(log logr.Logger, vdb *vapi.VerticaDB, prunner *cmds.ClusterPodRunner, + pfacts *PodFacts) []ReconcileActor { // The actors that will be applied, in sequence, to reconcile a vdb. // Note, we run the StatusReconciler multiple times. This allows us to // refresh the status of the vdb as we do operations that affect it. - actors := []ReconcileActor{ + return []ReconcileActor{ // Always start with a status reconcile in case the prior reconcile failed. - MakeStatusReconciler(r.Client, r.Scheme, log, vdb, &pfacts), + MakeStatusReconciler(r.Client, r.Scheme, log, vdb, pfacts), // Handle upgrade actions for any k8s objects created in prior versions // of the operator. MakeUpgradeOperator120Reconciler(r, log, vdb), // Handles vertica server upgrade (i.e., when spec.image changes) - MakeOfflineUpgradeReconciler(r, log, vdb, prunner, &pfacts), - MakeOnlineUpgradeReconciler(r, log, vdb, prunner, &pfacts), + MakeOfflineUpgradeReconciler(r, log, vdb, prunner, pfacts), + MakeOnlineUpgradeReconciler(r, log, vdb, prunner, pfacts), // Handles restart + re_ip of vertica - MakeRestartReconciler(r, log, vdb, prunner, &pfacts, true), - MakeStatusReconciler(r.Client, r.Scheme, log, vdb, &pfacts), + MakeRestartReconciler(r, log, vdb, prunner, pfacts, true), + MakeStatusReconciler(r.Client, r.Scheme, log, vdb, pfacts), + // Ensure we add labels to any pod rescheduled so that Service objects route traffic to it. + MakeClientRoutingLabelReconciler(r, vdb, pfacts, PodRescheduleApplyMethod, ""), + // Remove Service label for any pods that are pending delete. This will + // cause the Service object to stop routing traffic to them. + MakeClientRoutingLabelReconciler(r, vdb, pfacts, DelNodeApplyMethod, ""), // Handles calls to admintools -t db_remove_subcluster - MakeDBRemoveSubclusterReconciler(r, log, vdb, prunner, &pfacts), - MakeStatusReconciler(r.Client, r.Scheme, log, vdb, &pfacts), + MakeDBRemoveSubclusterReconciler(r, log, vdb, prunner, pfacts), + MakeStatusReconciler(r.Client, r.Scheme, log, vdb, pfacts), // Handles calls to admintools -t db_remove_node - MakeDBRemoveNodeReconciler(r, log, vdb, prunner, &pfacts), - MakeStatusReconciler(r.Client, r.Scheme, log, vdb, &pfacts), + MakeDBRemoveNodeReconciler(r, log, vdb, prunner, pfacts), + MakeStatusReconciler(r.Client, r.Scheme, log, vdb, pfacts), // Handle calls to remove hosts from admintools.conf - MakeUninstallReconciler(r, log, vdb, prunner, &pfacts), - MakeStatusReconciler(r.Client, r.Scheme, log, vdb, &pfacts), + MakeUninstallReconciler(r, log, vdb, prunner, pfacts), + MakeStatusReconciler(r.Client, r.Scheme, log, vdb, pfacts), // Creates or updates any k8s objects the CRD creates. This includes any // statefulsets and service objects. - MakeObjReconciler(r, log, vdb, &pfacts), + MakeObjReconciler(r, log, vdb, pfacts), // Set version info in the annotations and check that it is the minimum - MakeVersionReconciler(r, log, vdb, prunner, &pfacts, false), + MakeVersionReconciler(r, log, vdb, prunner, pfacts, false), // Handle calls to add hosts to admintools.conf - MakeInstallReconciler(r, log, vdb, prunner, &pfacts), - MakeStatusReconciler(r.Client, r.Scheme, log, vdb, &pfacts), + MakeInstallReconciler(r, log, vdb, prunner, pfacts), + MakeStatusReconciler(r.Client, r.Scheme, log, vdb, pfacts), // Handle calls to admintools -t create_db - MakeCreateDBReconciler(r, log, vdb, prunner, &pfacts), + MakeCreateDBReconciler(r, log, vdb, prunner, pfacts), // Handle calls to admintools -t revive_db - MakeReviveDBReconciler(r, log, vdb, prunner, &pfacts), + MakeReviveDBReconciler(r, log, vdb, prunner, pfacts), // Create and revive are mutually exclusive exclusive, so this handles // status updates after both of them. - MakeStatusReconciler(r.Client, r.Scheme, log, vdb, &pfacts), + MakeStatusReconciler(r.Client, r.Scheme, log, vdb, pfacts), + // Update the labels in pods so that Services route to nodes to them. + MakeClientRoutingLabelReconciler(r, vdb, pfacts, AddNodeApplyMethod, ""), // Handle calls to admintools -t db_add_subcluster - MakeDBAddSubclusterReconciler(r, log, vdb, prunner, &pfacts), - MakeStatusReconciler(r.Client, r.Scheme, log, vdb, &pfacts), + MakeDBAddSubclusterReconciler(r, log, vdb, prunner, pfacts), + MakeStatusReconciler(r.Client, r.Scheme, log, vdb, pfacts), // Handle calls to admintools -t db_add_node - MakeDBAddNodeReconciler(r, log, vdb, prunner, &pfacts), - MakeStatusReconciler(r.Client, r.Scheme, log, vdb, &pfacts), + MakeDBAddNodeReconciler(r, log, vdb, prunner, pfacts), + MakeStatusReconciler(r.Client, r.Scheme, log, vdb, pfacts), // Handle calls to rebalance_shards - MakeRebalanceShardsReconciler(r, log, vdb, prunner, &pfacts), + MakeRebalanceShardsReconciler(r, log, vdb, prunner, pfacts, "" /* all subclusters */), + // Update the label in pods so that Service routing uses them if they + // have finished being rebalanced. + MakeClientRoutingLabelReconciler(r, vdb, pfacts, AddNodeApplyMethod, ""), } - - // Iterate over each actor - for _, act := range actors { - log.Info("starting actor", "name", fmt.Sprintf("%T", act)) - res, err = act.Reconcile(ctx, &req) - // Error or a request to requeue will stop the reconciliation. - if verrors.IsReconcileAborted(res, err) { - // Handle requeue time priority. - // If any function needs a requeue and we have a RequeueTime set, - // then overwrite RequeueAfter. - // Functions such as Upgrade may already set RequeueAfter and Requeue to false - if (res.Requeue || res.RequeueAfter > 0) && vdb.Spec.RequeueTime > 0 { - res.Requeue = false - res.RequeueAfter = time.Second * time.Duration(vdb.Spec.RequeueTime) - } - log.Info("aborting reconcile of VerticaDB", "result", res, "err", err) - return res, err - } - } - - log.Info("ending reconcile of VerticaDB", "result", res, "err", err) - return res, err } // GetSuperuserPassword returns the superuser password if it has been provided diff --git a/pkg/iter/sc_finder.go b/pkg/iter/sc_finder.go index d83bc8bd0..51554ac9f 100644 --- a/pkg/iter/sc_finder.go +++ b/pkg/iter/sc_finder.go @@ -126,20 +126,11 @@ func (m *SubclusterFinder) FindSubclusters(ctx context.Context, flags FindFlags) } // We will convert each statefulset into a vapi.Subcluster stub object. We - // only fill in the name. + // only fill in the name. Size is intentionally left zero as this is an + // indication the subcluster is being removed. for i := range missingSts.Items { scName := missingSts.Items[i].Labels[builder.SubclusterNameLabel] - subclusters = append(subclusters, &vapi.Subcluster{Name: scName}) - } - } - - // We include the transient if it should exist based on online image in - // progress state. This is added after we fetch any subcluster from the - // statefulset lookup. This prevents us from including it twice. - if flags&FindInVdb != 0 && m.Vdb.RequiresTransientSubcluster() && m.Vdb.IsOnlineUpgradeInProgress() { - transient := m.Vdb.BuildTransientSubcluster("") - if !isSubclusterInSlice(transient, subclusters) { - subclusters = append(subclusters, transient) + subclusters = append(subclusters, &vapi.Subcluster{Name: scName, Size: 0}) } } @@ -239,13 +230,3 @@ func getLabelsFromObject(obj runtime.Object) (map[string]string, bool) { } return nil, false } - -// isSubclusterInSlice return true if the given subcluster is in the subcluster slice -func isSubclusterInSlice(subcluster *vapi.Subcluster, subclusters []*vapi.Subcluster) bool { - for i := range subclusters { - if subclusters[i].Name == subcluster.Name { - return true - } - } - return false -} diff --git a/tests/e2e-online-upgrade/online-upgrade-and-scale-up/20-assert.yaml b/tests/e2e-online-upgrade/online-upgrade-and-scale-up/20-assert.yaml index e55d330c3..a8b8ce292 100644 --- a/tests/e2e-online-upgrade/online-upgrade-and-scale-up/20-assert.yaml +++ b/tests/e2e-online-upgrade/online-upgrade-and-scale-up/20-assert.yaml @@ -37,6 +37,7 @@ metadata: spec: selector: app.kubernetes.io/instance: v-base-upgrade + vertica.com/client-routing: "true" vertica.com/subcluster-name: transient --- apiVersion: apps/v1 diff --git a/tests/e2e-online-upgrade/online-upgrade-down-cluster/35-assert.yaml b/tests/e2e-online-upgrade/online-upgrade-down-cluster/35-assert.yaml index 84de39415..02b1f130e 100644 --- a/tests/e2e-online-upgrade/online-upgrade-down-cluster/35-assert.yaml +++ b/tests/e2e-online-upgrade/online-upgrade-down-cluster/35-assert.yaml @@ -28,4 +28,4 @@ status: status: "False" - type: OnlineUpgradeInProgress status: "True" - subclusterCount: 2 + subclusterCount: 1 diff --git a/tests/e2e-online-upgrade/online-upgrade-drain/50-assert.yaml b/tests/e2e-online-upgrade/online-upgrade-drain/50-assert.yaml index c2e99e667..47a73ce3d 100644 --- a/tests/e2e-online-upgrade/online-upgrade-drain/50-assert.yaml +++ b/tests/e2e-online-upgrade/online-upgrade-drain/50-assert.yaml @@ -26,6 +26,7 @@ spec: selector: app.kubernetes.io/instance: v-base-upgrade vertica.com/subcluster-name: sec1 + vertica.com/client-routing: "true" --- apiVersion: v1 kind: Service @@ -35,6 +36,7 @@ spec: selector: app.kubernetes.io/instance: v-base-upgrade vertica.com/subcluster-svc: pri1 + vertica.com/client-routing: "true" --- apiVersion: v1 kind: Service @@ -44,3 +46,4 @@ spec: selector: app.kubernetes.io/instance: v-base-upgrade vertica.com/subcluster-svc: pri2 + vertica.com/client-routing: "true" diff --git a/tests/e2e-online-upgrade/online-upgrade-kill-transient/30-assert.yaml b/tests/e2e-online-upgrade/online-upgrade-kill-transient/30-assert.yaml index b3be8a13f..79c818ed0 100644 --- a/tests/e2e-online-upgrade/online-upgrade-kill-transient/30-assert.yaml +++ b/tests/e2e-online-upgrade/online-upgrade-kill-transient/30-assert.yaml @@ -25,6 +25,7 @@ metadata: spec: selector: app.kubernetes.io/instance: v-base-upgrade + vertica.com/client-routing: "true" vertica.com/subcluster-name: transient --- apiVersion: v1 @@ -34,4 +35,5 @@ metadata: spec: selector: app.kubernetes.io/instance: v-base-upgrade + vertica.com/client-routing: "true" vertica.com/subcluster-svc: sec diff --git a/tests/e2e-online-upgrade/online-upgrade-kill-transient/40-assert.yaml b/tests/e2e-online-upgrade/online-upgrade-kill-transient/40-assert.yaml index 7d17bb960..6fc480263 100644 --- a/tests/e2e-online-upgrade/online-upgrade-kill-transient/40-assert.yaml +++ b/tests/e2e-online-upgrade/online-upgrade-kill-transient/40-assert.yaml @@ -25,6 +25,7 @@ metadata: spec: selector: app.kubernetes.io/instance: v-base-upgrade + vertica.com/client-routing: "true" vertica.com/subcluster-name: transient --- apiVersion: v1 @@ -34,4 +35,5 @@ metadata: spec: selector: app.kubernetes.io/instance: v-base-upgrade + vertica.com/client-routing: "true" vertica.com/subcluster-svc: pri diff --git a/tests/e2e-online-upgrade/online-upgrade-no-transient-1-sc/25-assert.yaml b/tests/e2e-online-upgrade/online-upgrade-no-transient-1-sc/25-assert.yaml index 7ed04152f..036dff32e 100644 --- a/tests/e2e-online-upgrade/online-upgrade-no-transient-1-sc/25-assert.yaml +++ b/tests/e2e-online-upgrade/online-upgrade-no-transient-1-sc/25-assert.yaml @@ -38,3 +38,4 @@ spec: selector: app.kubernetes.io/instance: v-base-upgrade vertica.com/subcluster-name: pri + vertica.com/client-routing: "true" diff --git a/tests/e2e-online-upgrade/online-upgrade-no-transient-2-sc/30-assert.yaml b/tests/e2e-online-upgrade/online-upgrade-no-transient-2-sc/30-assert.yaml index 123292308..511b6a9bf 100644 --- a/tests/e2e-online-upgrade/online-upgrade-no-transient-2-sc/30-assert.yaml +++ b/tests/e2e-online-upgrade/online-upgrade-no-transient-2-sc/30-assert.yaml @@ -25,6 +25,7 @@ metadata: spec: selector: app.kubernetes.io/instance: v-base-upgrade + vertica.com/client-routing: "true" vertica.com/subcluster-name: sec --- apiVersion: v1 @@ -34,4 +35,5 @@ metadata: spec: selector: app.kubernetes.io/instance: v-base-upgrade + vertica.com/client-routing: "true" vertica.com/subcluster-svc: sec diff --git a/tests/e2e-online-upgrade/online-upgrade-no-transient-2-sc/40-assert.yaml b/tests/e2e-online-upgrade/online-upgrade-no-transient-2-sc/40-assert.yaml index 610b7ecfa..e5cf56ea4 100644 --- a/tests/e2e-online-upgrade/online-upgrade-no-transient-2-sc/40-assert.yaml +++ b/tests/e2e-online-upgrade/online-upgrade-no-transient-2-sc/40-assert.yaml @@ -26,6 +26,7 @@ spec: selector: app.kubernetes.io/instance: v-base-upgrade vertica.com/subcluster-name: pri + vertica.com/client-routing: "true" --- apiVersion: v1 kind: Service @@ -35,3 +36,4 @@ spec: selector: app.kubernetes.io/instance: v-base-upgrade vertica.com/subcluster-svc: pri + vertica.com/client-routing: "true" diff --git a/tests/e2e-online-upgrade/online-upgrade-with-transient/30-assert.yaml b/tests/e2e-online-upgrade/online-upgrade-with-transient/30-assert.yaml index b3be8a13f..f57d0bbd5 100644 --- a/tests/e2e-online-upgrade/online-upgrade-with-transient/30-assert.yaml +++ b/tests/e2e-online-upgrade/online-upgrade-with-transient/30-assert.yaml @@ -26,6 +26,7 @@ spec: selector: app.kubernetes.io/instance: v-base-upgrade vertica.com/subcluster-name: transient + vertica.com/client-routing: "true" --- apiVersion: v1 kind: Service @@ -35,3 +36,4 @@ spec: selector: app.kubernetes.io/instance: v-base-upgrade vertica.com/subcluster-svc: sec + vertica.com/client-routing: "true" diff --git a/tests/e2e-online-upgrade/online-upgrade-with-transient/40-assert.yaml b/tests/e2e-online-upgrade/online-upgrade-with-transient/40-assert.yaml index 7d17bb960..33492c8b6 100644 --- a/tests/e2e-online-upgrade/online-upgrade-with-transient/40-assert.yaml +++ b/tests/e2e-online-upgrade/online-upgrade-with-transient/40-assert.yaml @@ -26,6 +26,7 @@ spec: selector: app.kubernetes.io/instance: v-base-upgrade vertica.com/subcluster-name: transient + vertica.com/client-routing: "true" --- apiVersion: v1 kind: Service @@ -35,3 +36,4 @@ spec: selector: app.kubernetes.io/instance: v-base-upgrade vertica.com/subcluster-svc: pri + vertica.com/client-routing: "true" diff --git a/tests/e2e-online-upgrade/online-upgrade-with-transient/verify-vsql-access/base/verify-vsql-access.yaml b/tests/e2e-online-upgrade/online-upgrade-with-transient/verify-vsql-access/base/verify-vsql-access.yaml index 2fb96faf9..c1d574343 100644 --- a/tests/e2e-online-upgrade/online-upgrade-with-transient/verify-vsql-access/base/verify-vsql-access.yaml +++ b/tests/e2e-online-upgrade/online-upgrade-with-transient/verify-vsql-access/base/verify-vsql-access.yaml @@ -26,8 +26,6 @@ data: # Access to both pri and sec subclusters should be okay. One of them will # connect with node0003, which is the node for the transient subcluster. - # If any of the connections can't connect, the command will fail and we bail - # out because errexit is set. PRI_CONNECTION_NODE=$(vsql -h v-base-upgrade-pri -tAc "select node_name from current_session") SEC_CONNECTION_NODE=$(vsql -h v-base-upgrade-sec -tAc "select node_name from current_session")