Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Istio proxy sidecar initialization issue #702

Merged
merged 4 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changes/unreleased/Fixed-20240213-111303.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: Fixed
body: Resolves the issue when Istio proxy sidecar is injected as the first container.
time: 2024-02-13T11:13:03.768289581-04:00
custom:
Issue: "702"
19 changes: 11 additions & 8 deletions pkg/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
. "github.com/onsi/gomega"
vapi "github.com/vertica/vertica-kubernetes/api/v1"
vmeta "github.com/vertica/vertica-kubernetes/pkg/meta"
"github.com/vertica/vertica-kubernetes/pkg/names"
"github.com/vertica/vertica-kubernetes/pkg/paths"
"github.com/vertica/vertica-kubernetes/pkg/vk8s"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -271,9 +271,10 @@ var _ = Describe("builder", func() {
vdb.Annotations[vmeta.VClusterOpsAnnotation] = vmeta.VClusterOpsAnnotationTrue

c := buildPodSpec(vdb, &vdb.Spec.Subclusters[0])
inx := names.GetServerContainerIndex(vdb)
Ω(c.Containers[inx].ReadinessProbe.HTTPGet).ShouldNot(BeNil())
Ω(c.Containers[inx].LivenessProbe.HTTPGet).ShouldNot(BeNil())
svrCnt := vk8s.GetServerContainer(c.Containers)
Ω(svrCnt).ShouldNot(BeNil())
Ω(svrCnt.ReadinessProbe.HTTPGet).ShouldNot(BeNil())
Ω(svrCnt.LivenessProbe.HTTPGet).ShouldNot(BeNil())

vdb.Spec.ReadinessProbeOverride = &v1.Probe{
ProbeHandler: v1.ProbeHandler{
Expand All @@ -290,10 +291,12 @@ var _ = Describe("builder", func() {
},
}
c = buildPodSpec(vdb, &vdb.Spec.Subclusters[0])
Ω(c.Containers[inx].ReadinessProbe.HTTPGet).Should(BeNil())
Ω(c.Containers[inx].LivenessProbe.HTTPGet).Should(BeNil())
Ω(c.Containers[inx].ReadinessProbe.Exec).ShouldNot(BeNil())
Ω(c.Containers[inx].LivenessProbe.TCPSocket).ShouldNot(BeNil())
svrCnt = vk8s.GetServerContainer(c.Containers)
Ω(svrCnt).ShouldNot(BeNil())
Ω(svrCnt.ReadinessProbe.HTTPGet).Should(BeNil())
Ω(svrCnt.LivenessProbe.HTTPGet).Should(BeNil())
Ω(svrCnt.ReadinessProbe.Exec).ShouldNot(BeNil())
Ω(svrCnt.LivenessProbe.TCPSocket).ShouldNot(BeNil())
})

It("should not use canary query probe if using GSM", func() {
Expand Down
67 changes: 43 additions & 24 deletions pkg/controllers/vdb/obj_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
vmeta "github.com/vertica/vertica-kubernetes/pkg/meta"
"github.com/vertica/vertica-kubernetes/pkg/names"
"github.com/vertica/vertica-kubernetes/pkg/paths"
"github.com/vertica/vertica-kubernetes/pkg/vk8s"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -412,13 +413,7 @@ func (o *ObjReconciler) reconcileSts(ctx context.Context, sc *vapi.Subcluster) (
err := o.VRec.Client.Get(ctx, nm, curSts)
if err != nil && errors.IsNotFound(err) {
o.Log.Info("Creating statefulset", "Name", nm, "Size", expSts.Spec.Replicas, "Image", expSts.Spec.Template.Spec.Containers[0].Image)
err = ctrl.SetControllerReference(o.Vdb, expSts, o.VRec.Scheme)
if err != nil {
return ctrl.Result{}, err
}
// Invalidate the pod facts cache since we are creating a new sts
o.PFacts.Invalidate()
return ctrl.Result{}, o.VRec.Client.Create(ctx, expSts)
return ctrl.Result{}, o.createSts(ctx, expSts)
}

// We can only remove pods if we have called remove node and done the
Expand All @@ -434,13 +429,20 @@ func (o *ObjReconciler) reconcileSts(ctx context.Context, sc *vapi.Subcluster) (
// We always preserve the image. This is done because during upgrade, the
// image is changed outside of this reconciler. It is done through a
// separate update to the sts.
i := names.GetServerContainerIndex(o.Vdb)
// It does not matter which is the first container,
// they have the same image
curImage := curSts.Spec.Template.Spec.Containers[0].Image
expSts.Spec.Template.Spec.Containers[i].Image = curImage
if o.Vdb.IsNMASideCarDeploymentEnabled() {
expSts.Spec.Template.Spec.Containers[names.GetNMAContainerIndex()].Image = curImage
//
// Both the NMA and server container have the same image, but the server
// container is guaranteed to be their for all deployments.
curImage, err := vk8s.GetServerImage(curSts.Spec.Template.Spec.Containers)
if err != nil {
return ctrl.Result{}, err
}
expSvrCnt := vk8s.GetServerContainer(expSts.Spec.Template.Spec.Containers)
if expSvrCnt == nil {
return ctrl.Result{}, fmt.Errorf("could not find server container in sts %s", expSts.Name)
}
expSvrCnt.Image = curImage
if expNMACnt := vk8s.GetNMAContainer(expSts.Spec.Template.Spec.Containers); expNMACnt != nil {
expNMACnt.Image = curImage
}

// Preserve scaling if told to do so. This is used when doing early
Expand All @@ -465,15 +467,33 @@ func (o *ObjReconciler) reconcileSts(ctx context.Context, sc *vapi.Subcluster) (
// drop the old sts and create a fresh one.
if isNMADeploymentDifferent(curSts, expSts) {
o.Log.Info("Dropping then recreating statefulset", "Name", expSts.Name)
if err := o.VRec.Client.Delete(ctx, curSts); err != nil {
return ctrl.Result{}, err
}
if err := ctrl.SetControllerReference(o.Vdb, expSts, o.VRec.Scheme); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, o.VRec.Client.Create(ctx, expSts)
return ctrl.Result{}, o.recreateSts(ctx, curSts, expSts)
}

return ctrl.Result{}, o.updateSts(ctx, curSts, expSts)
}

// recreateSts will drop then create the statefulset
func (o *ObjReconciler) recreateSts(ctx context.Context, curSts, expSts *appsv1.StatefulSet) error {
if err := o.VRec.Client.Delete(ctx, curSts); err != nil {
return err
}
return o.createSts(ctx, expSts)
}

// createSts will create a new sts. It assumes the statefulset doesn't already exist.
func (o *ObjReconciler) createSts(ctx context.Context, expSts *appsv1.StatefulSet) error {
err := ctrl.SetControllerReference(o.Vdb, expSts, o.VRec.Scheme)
if err != nil {
return err
}
// Invalidate the pod facts cache since we are creating a new sts
o.PFacts.Invalidate()
return o.VRec.Client.Create(ctx, expSts)
}

// updateSts will patch an existing statefulset.
func (o *ObjReconciler) updateSts(ctx context.Context, curSts, expSts *appsv1.StatefulSet) error {
// Update the sts by patching in fields that changed according to expSts.
// Due to the omission of default fields in expSts, curSts != expSts. We
// always send a patch request, then compare what came back against origSts
Expand All @@ -484,15 +504,14 @@ func (o *ObjReconciler) reconcileSts(ctx context.Context, sc *vapi.Subcluster) (
expSts.Spec.DeepCopyInto(&curSts.Spec)
curSts.Labels = expSts.Labels
if err := o.VRec.Client.Patch(ctx, curSts, patch); err != nil {
return ctrl.Result{}, err
return err
}
if !reflect.DeepEqual(curSts.Spec, origSts.Spec) {
o.Log.Info("Patching statefulset", "Name", expSts.Name, "Image", expSts.Spec.Template.Spec.Containers[0].Image)
// Invalidate the pod facts cache since we are about to change the sts
o.PFacts.Invalidate()
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
return nil
}

// isNMADeploymentDifferent will return true if one of the statefulsets have a
Expand Down
16 changes: 11 additions & 5 deletions pkg/controllers/vdb/offlineupgrade_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
verrors "github.com/vertica/vertica-kubernetes/pkg/errors"
"github.com/vertica/vertica-kubernetes/pkg/events"
"github.com/vertica/vertica-kubernetes/pkg/iter"
"github.com/vertica/vertica-kubernetes/pkg/names"
"github.com/vertica/vertica-kubernetes/pkg/vadmin"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/stopdb"
"github.com/vertica/vertica-kubernetes/pkg/vk8s"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -252,8 +252,11 @@ func (o *OfflineUpgradeReconciler) checkForNewPods(ctx context.Context) (ctrl.Re
}
for i := range pods.Items {
pod := &pods.Items[i]
cnts := pod.Spec.Containers
if cnts[names.GetFirstContainerIndex()].Image == o.Vdb.Spec.Image {
cntImage, err := vk8s.GetServerImage(pod.Spec.Containers)
if err != nil {
return ctrl.Result{}, err
}
if cntImage == o.Vdb.Spec.Image {
foundPodWithNewImage = true
break
}
Expand Down Expand Up @@ -334,8 +337,11 @@ func (o *OfflineUpgradeReconciler) anyPodsRunningWithOldImage(ctx context.Contex
if errors.IsNotFound(err) {
continue
}
cnts := pod.Spec.Containers
if cnts[names.GetFirstContainerIndex()].Image != o.Vdb.Spec.Image {
cntImage, err := vk8s.GetServerImage(pod.Spec.Containers)
if err != nil {
return false, err
}
if cntImage != o.Vdb.Spec.Image {
return true, nil
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/vdb/offlineupgrade_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/vertica/vertica-kubernetes/pkg/iter"
"github.com/vertica/vertica-kubernetes/pkg/names"
"github.com/vertica/vertica-kubernetes/pkg/test"
"github.com/vertica/vertica-kubernetes/pkg/vk8s"
appsv1 "k8s.io/api/apps/v1"
ctrl "sigs.k8s.io/controller-runtime"
)
Expand All @@ -45,15 +46,15 @@ var _ = Describe("offlineupgrade_reconcile", func() {

sts := &appsv1.StatefulSet{}
Expect(k8sClient.Get(ctx, names.GenStsName(vdb, &vdb.Spec.Subclusters[0]), sts)).Should(Succeed())
Expect(sts.Spec.Template.Spec.Containers[names.GetFirstContainerIndex()].Image).ShouldNot(Equal(NewImage))
Expect(vk8s.GetServerImage(sts.Spec.Template.Spec.Containers)).ShouldNot(Equal(NewImage))

updateVdbToCauseUpgrade(ctx, vdb, NewImage)

r, _, _ := createOfflineUpgradeReconciler(vdb)
Expect(r.Reconcile(ctx, &ctrl.Request{})).Should(Equal(ctrl.Result{Requeue: false, RequeueAfter: vdb.GetUpgradeRequeueTimeDuration()}))

Expect(k8sClient.Get(ctx, names.GenStsName(vdb, &vdb.Spec.Subclusters[0]), sts)).Should(Succeed())
Expect(sts.Spec.Template.Spec.Containers[names.GetFirstContainerIndex()].Image).Should(Equal(NewImage))
Expect(vk8s.GetServerImage(sts.Spec.Template.Spec.Containers)).Should(Equal(NewImage))
})

It("should stop cluster during an upgrade", func() {
Expand Down
16 changes: 13 additions & 3 deletions pkg/controllers/vdb/onlineupgrade_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
vmeta "github.com/vertica/vertica-kubernetes/pkg/meta"
"github.com/vertica/vertica-kubernetes/pkg/names"
"github.com/vertica/vertica-kubernetes/pkg/vadmin"
"github.com/vertica/vertica-kubernetes/pkg/vk8s"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -411,7 +412,10 @@ func (o *OnlineUpgradeReconciler) isMatchingSubclusterType(sts *appsv1.StatefulS
// drainSubcluster will reroute traffic away from a subcluster and wait for it to be idle.
// This is a no-op if the image has already been updated for the subcluster.
func (o *OnlineUpgradeReconciler) drainSubcluster(ctx context.Context, sts *appsv1.StatefulSet) (ctrl.Result, error) {
img := sts.Spec.Template.Spec.Containers[names.GetFirstContainerIndex()].Image
img, err := vk8s.GetServerImage(sts.Spec.Template.Spec.Containers)
if err != nil {
return ctrl.Result{}, err
}

if img != o.Vdb.Spec.Image {
scName := sts.Labels[vmeta.SubclusterNameLabel]
Expand Down Expand Up @@ -542,7 +546,10 @@ func (o *OnlineUpgradeReconciler) waitForReadOnly(_ context.Context, sts *appsv1
if o.PFacts.countUpPrimaryNodes() != 0 {
return ctrl.Result{}, nil
}
newImage := sts.Spec.Template.Spec.Containers[names.GetFirstContainerIndex()].Image
newImage, err := vk8s.GetServerImage(sts.Spec.Template.Spec.Containers)
if err != nil {
return ctrl.Result{}, err
}
// If all the pods that are running the old image are read-only we are done
// our wait.
if o.PFacts.countNotReadOnlyWithOldImage(newImage) == 0 {
Expand Down Expand Up @@ -661,7 +668,10 @@ func (o *OnlineUpgradeReconciler) cachePrimaryImages(ctx context.Context) error
for i := range stss.Items {
sts := &stss.Items[i]
if sts.Labels[vmeta.SubclusterTypeLabel] == vapi.PrimarySubcluster {
img := sts.Spec.Template.Spec.Containers[names.GetFirstContainerIndex()].Image
img, err := vk8s.GetServerImage(sts.Spec.Template.Spec.Containers)
if err != nil {
return err
}
imageFound := false
for j := range o.PrimaryImages {
imageFound = o.PrimaryImages[j] == img
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/vdb/onlineupgrade_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
vmeta "github.com/vertica/vertica-kubernetes/pkg/meta"
"github.com/vertica/vertica-kubernetes/pkg/names"
"github.com/vertica/vertica-kubernetes/pkg/test"
"github.com/vertica/vertica-kubernetes/pkg/vk8s"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -333,9 +334,9 @@ var _ = Describe("onlineupgrade_reconcile", func() {

sts := &appsv1.StatefulSet{}
Expect(k8sClient.Get(ctx, names.GenStsName(vdb, &vdb.Spec.Subclusters[0]), sts)).Should(Succeed())
Expect(sts.Spec.Template.Spec.Containers[names.GetFirstContainerIndex()].Image).Should(Equal(NewImageName))
Expect(vk8s.GetServerImage(sts.Spec.Template.Spec.Containers)).Should(Equal(NewImageName))
Expect(k8sClient.Get(ctx, names.GenStsName(vdb, &vdb.Spec.Subclusters[1]), sts)).Should(Succeed())
Expect(sts.Spec.Template.Spec.Containers[names.GetFirstContainerIndex()].Image).Should(Equal(NewImageName))
Expect(vk8s.GetServerImage(sts.Spec.Template.Spec.Containers)).Should(Equal(NewImageName))
})

It("should have an upgradeStatus set when it fails part way through", func() {
Expand Down
38 changes: 24 additions & 14 deletions pkg/controllers/vdb/podfacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
vmeta "github.com/vertica/vertica-kubernetes/pkg/meta"
"github.com/vertica/vertica-kubernetes/pkg/names"
"github.com/vertica/vertica-kubernetes/pkg/paths"
"github.com/vertica/vertica-kubernetes/pkg/vk8s"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -345,9 +346,15 @@ func (p *PodFacts) collectPodByStsIndex(ctx context.Context, vdb *vapi.VerticaDB
pf.isTransient, _ = strconv.ParseBool(pod.Labels[vmeta.SubclusterTransientLabel])
pf.isPendingDelete = podIndex >= sc.Size
// Let's just pick the first container image
pf.image = pod.Spec.Containers[names.GetFirstContainerIndex()].Image
pf.image, err = vk8s.GetServerImage(pod.Spec.Containers)
if err != nil {
return err
}
pf.hasDCTableAnnotations = p.checkDCTableAnnotations(pod)
pf.catalogPath = p.getCatalogPathFromPod(vdb, pod)
pf.catalogPath, err = p.getCatalogPathFromPod(vdb, pod)
if err != nil {
return err
}
pf.hasNMASidecar = builder.HasNMAContainer(&pod.Spec)
}

Expand Down Expand Up @@ -689,30 +696,33 @@ func (p *PodFacts) checkDCTableAnnotations(pod *corev1.Pod) bool {
}

// getCatalogPathFromPod will get the current catalog path from the pod
func (p *PodFacts) getCatalogPathFromPod(vdb *vapi.VerticaDB, pod *corev1.Pod) string {
func (p *PodFacts) getCatalogPathFromPod(vdb *vapi.VerticaDB, pod *corev1.Pod) (string, error) {
// both server and nma(if sidecar deployment enabled) have
// the catalog path env set so we can pick either to get it
index := names.GetFirstContainerIndex()
return p.getEnvValueFromPodWithDefault(pod, index,
builder.CatalogPathEnv, vdb.Spec.Local.GetCatalogPath())
// the catalog path env set so we just pick the server since that container
// will be available in all deployments.
cnt := vk8s.GetServerContainer(pod.Spec.Containers)
if cnt == nil {
return "", fmt.Errorf("could not find the server container in the pod %s", pod.Name)
}
return p.getEnvValueFromPodWithDefault(cnt,
builder.CatalogPathEnv, vdb.Spec.Local.GetCatalogPath()), nil
}

// getEnvValueFromPodWithDefault will get an environment value from the pod. A default
// value is used if the env var isn't found.
func (p *PodFacts) getEnvValueFromPodWithDefault(pod *corev1.Pod, cntIndex int,
func (p *PodFacts) getEnvValueFromPodWithDefault(cnt *corev1.Container,
envName, defaultValue string) string {
pathPrefix, ok := p.getEnvValueFromPod(pod, cntIndex, envName)
pathPrefix, ok := p.getEnvValueFromPod(cnt, envName)
if !ok {
return defaultValue
}
return pathPrefix
}

func (p *PodFacts) getEnvValueFromPod(pod *corev1.Pod, index int, envName string) (string, bool) {
c := pod.Spec.Containers[index]
for i := range c.Env {
if c.Env[i].Name == envName {
return c.Env[i].Value, true
func (p *PodFacts) getEnvValueFromPod(cnt *corev1.Container, envName string) (string, bool) {
for i := range cnt.Env {
if cnt.Env[i].Name == envName {
return cnt.Env[i].Value, true
}
}
return "", false
Expand Down
16 changes: 11 additions & 5 deletions pkg/controllers/vdb/restart_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/restartnode"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/startdb"
"github.com/vertica/vertica-kubernetes/pkg/vdbstatus"
"github.com/vertica/vertica-kubernetes/pkg/vk8s"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -597,8 +598,11 @@ func (r *RestartReconciler) makeResultForLivenessProbeWait(ctx context.Context)
}
return ctrl.Result{}, err
}
cnts := pod.Spec.Containers
probe := cnts[names.GetServerContainerIndexInSlice(cnts)].LivenessProbe
svrCnt := vk8s.GetServerContainer(pod.Spec.Containers)
if svrCnt == nil {
return ctrl.Result{}, fmt.Errorf("could not find server container for pod %s", pod.Name)
}
probe := svrCnt.LivenessProbe
if probe == nil {
// For backwards compatibility, if the probe isn't set, then we just
// return a simple requeue with exponential backoff.
Expand All @@ -621,9 +625,11 @@ func (r *RestartReconciler) isStartupProbeActive(ctx context.Context, nm types.N
}
// If the pod doesn't have a livenessProbe then we always return true. This
// can happen if we are in the middle of upgrading the operator.
cnts := pod.Spec.Containers
probe := cnts[names.GetServerContainerIndexInSlice(cnts)].LivenessProbe
if probe == nil {
svrCnt := vk8s.GetServerContainer(pod.Spec.Containers)
if svrCnt == nil {
return false, fmt.Errorf("could not find server container for pod %s", nm.Name)
}
if svrCnt.LivenessProbe == nil {
r.Log.Info("Pod doesn't have a livenessProbe. Okay to restart", "pod", nm)
return true, nil
}
Expand Down
Loading
Loading